From 4dea3a48e8a4cc56debe634d62b758ae5b2c92ce Mon Sep 17 00:00:00 2001 From: Lyndon-Li Date: Fri, 9 Aug 2024 14:34:48 +0800 Subject: [PATCH 1/5] data mover ms smoking test Signed-off-by: Lyndon-Li --- pkg/cmd/cli/datamover/backup.go | 1 + pkg/cmd/cli/datamover/restore.go | 1 + pkg/controller/data_download_controller.go | 72 ++++---- .../data_download_controller_test.go | 49 +++++- pkg/controller/data_upload_controller.go | 75 +++++---- pkg/controller/data_upload_controller_test.go | 156 ++++++++++-------- pkg/datamover/backup_micro_service.go | 14 +- pkg/datamover/backup_micro_service_test.go | 6 +- pkg/datamover/restore_micro_service.go | 10 +- pkg/datamover/restore_micro_service_test.go | 6 +- pkg/datapath/file_system.go | 46 +++++- pkg/datapath/file_system_test.go | 2 + pkg/datapath/micro_service_watcher.go | 94 ++++++----- pkg/datapath/micro_service_watcher_test.go | 2 +- 14 files changed, 324 insertions(+), 210 deletions(-) diff --git a/pkg/cmd/cli/datamover/backup.go b/pkg/cmd/cli/datamover/backup.go index 35f483d921..ca600faab5 100644 --- a/pkg/cmd/cli/datamover/backup.go +++ b/pkg/cmd/cli/datamover/backup.go @@ -233,6 +233,7 @@ func (s *dataMoverBackup) runDataPath() { result, err := dpService.RunCancelableDataPath(s.ctx) if err != nil { + dpService.Shutdown() s.cancelFunc() funcExitWithMessage(s.logger, false, "Failed to run data path service for DataUpload %s: %v", s.config.duName, err) return diff --git a/pkg/cmd/cli/datamover/restore.go b/pkg/cmd/cli/datamover/restore.go index fb46abd409..fc74a64f15 100644 --- a/pkg/cmd/cli/datamover/restore.go +++ b/pkg/cmd/cli/datamover/restore.go @@ -223,6 +223,7 @@ func (s *dataMoverRestore) runDataPath() { result, err := dpService.RunCancelableDataPath(s.ctx) if err != nil { s.cancelFunc() + dpService.Shutdown() funcExitWithMessage(s.logger, false, "Failed to run data path service for DataDownload %s: %v", s.config.ddName, err) return } diff --git a/pkg/controller/data_download_controller.go b/pkg/controller/data_download_controller.go index f0c0c1728d..12365e03cc 100644 --- a/pkg/controller/data_download_controller.go +++ b/pkg/controller/data_download_controller.go @@ -217,9 +217,9 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request } else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted { if dd.Spec.Cancel { log.Debugf("Data download is been canceled %s in Phase %s", dd.GetName(), dd.Status.Phase) - r.TryCancelDataDownload(ctx, dd, "") + r.tryCancelAcceptedDataDownload(ctx, dd, "") } else if peekErr := r.restoreExposer.PeekExposed(ctx, getDataDownloadOwnerObject(dd)); peekErr != nil { - r.TryCancelDataDownload(ctx, dd, fmt.Sprintf("found a dataupload %s/%s with expose error: %s. mark it as cancel", dd.Namespace, dd.Name, peekErr)) + r.tryCancelAcceptedDataDownload(ctx, dd, fmt.Sprintf("found a dataupload %s/%s with expose error: %s. mark it as cancel", dd.Namespace, dd.Name, peekErr)) log.Errorf("Cancel dd %s/%s because of expose error %s", dd.Namespace, dd.Name, peekErr) } else if dd.Status.StartTimestamp != nil { if time.Since(dd.Status.StartTimestamp.Time) >= r.preparingTimeout { @@ -272,23 +272,35 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request return r.errorOut(ctx, dd, err, "error to create data path", log) } } + + if err := r.initCancelableDataPath(ctx, asyncBR, result, log); err != nil { + log.WithError(err).Warnf("Failed to init cancelable data path for %s, will close and retry", dd.Name) + + r.closeDataPath(ctx, dd.Name) + return r.errorOut(ctx, dd, err, "error initializing data path", log) + } + // Update status to InProgress original := dd.DeepCopy() dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseInProgress dd.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} if err := r.client.Patch(ctx, dd, client.MergeFrom(original)); err != nil { - log.WithError(err).Error("Unable to update status to in progress") - return ctrl.Result{}, err + log.WithError(err).Warnf("Failed to update datadownload %s to InProgress, will close data path and retry", dd.Name) + + r.closeDataPath(ctx, dd.Name) + return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil } log.Info("Data download is marked as in progress") - reconcileResult, err := r.runCancelableDataPath(ctx, asyncBR, dd, result, log) - if err != nil { - log.Errorf("Failed to run cancelable data path for %s with err %v", dd.Name, err) + if err := r.startCancelableDataPath(asyncBR, dd, result, log); err != nil { + log.WithError(err).Errorf("Failed to start cancelable data path for %s", dd.Name) + r.closeDataPath(ctx, dd.Name) + return r.errorOut(ctx, dd, err, "error starting data path", log) } - return reconcileResult, err + + return ctrl.Result{}, nil } else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress { log.Info("Data download is in progress") if dd.Spec.Cancel { @@ -331,27 +343,33 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request } } -func (r *DataDownloadReconciler) runCancelableDataPath(ctx context.Context, asyncBR datapath.AsyncBR, dd *velerov2alpha1api.DataDownload, res *exposer.ExposeResult, log logrus.FieldLogger) (reconcile.Result, error) { +func (r *DataDownloadReconciler) initCancelableDataPath(ctx context.Context, asyncBR datapath.AsyncBR, res *exposer.ExposeResult, log logrus.FieldLogger) error { + log.Info("Init cancelable dataDownload") + if err := asyncBR.Init(ctx, nil); err != nil { - return r.errorOut(ctx, dd, err, "error to initialize asyncBR", log) + return errors.Wrap(err, "error initializing asyncBR") } log.Infof("async restore init for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName) + return nil +} + +func (r *DataDownloadReconciler) startCancelableDataPath(asyncBR datapath.AsyncBR, dd *velerov2alpha1api.DataDownload, res *exposer.ExposeResult, log logrus.FieldLogger) error { + log.Info("Start cancelable dataDownload") + if err := asyncBR.StartRestore(dd.Spec.SnapshotID, datapath.AccessPoint{ ByPath: res.ByPod.VolumeName, }, dd.Spec.DataMoverConfig); err != nil { - return r.errorOut(ctx, dd, err, fmt.Sprintf("error starting async restore for pod %s, volume %s", res.ByPod.HostingPod, res.ByPod.VolumeName), log) + return errors.Wrapf(err, "error starting async restore for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName) } - log.Infof("Async restore started for pod %s, volume %s", res.ByPod.HostingPod, res.ByPod.VolumeName) - return ctrl.Result{}, nil + log.Info("Async restore started for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName) + return nil } func (r *DataDownloadReconciler) OnDataDownloadCompleted(ctx context.Context, namespace string, ddName string, result datapath.Result) { - defer func() { - go r.closeDataPath(ctx, ddName) - }() + defer r.dataPathMgr.RemoveAsyncBR(ddName) log := r.logger.WithField("datadownload", ddName) log.Info("Async fs restore data path completed") @@ -384,9 +402,7 @@ func (r *DataDownloadReconciler) OnDataDownloadCompleted(ctx context.Context, na } func (r *DataDownloadReconciler) OnDataDownloadFailed(ctx context.Context, namespace string, ddName string, err error) { - defer func() { - go r.closeDataPath(ctx, ddName) - }() + defer r.dataPathMgr.RemoveAsyncBR(ddName) log := r.logger.WithField("datadownload", ddName) @@ -396,16 +412,12 @@ func (r *DataDownloadReconciler) OnDataDownloadFailed(ctx context.Context, names if getErr := r.client.Get(ctx, types.NamespacedName{Name: ddName, Namespace: namespace}, &dd); getErr != nil { log.WithError(getErr).Warn("Failed to get data download on failure") } else { - if _, errOut := r.errorOut(ctx, &dd, err, "data path restore failed", log); err != nil { - log.WithError(err).Warnf("Failed to patch data download with err %v", errOut) - } + r.errorOut(ctx, &dd, err, "data path restore failed", log) } } func (r *DataDownloadReconciler) OnDataDownloadCancelled(ctx context.Context, namespace string, ddName string) { - defer func() { - go r.closeDataPath(ctx, ddName) - }() + defer r.dataPathMgr.RemoveAsyncBR(ddName) log := r.logger.WithField("datadownload", ddName) @@ -432,9 +444,9 @@ func (r *DataDownloadReconciler) OnDataDownloadCancelled(ctx context.Context, na } } -func (r *DataDownloadReconciler) TryCancelDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload, message string) { +func (r *DataDownloadReconciler) tryCancelAcceptedDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload, message string) { log := r.logger.WithField("datadownload", dd.Name) - log.Warn("Async fs backup data path canceled") + log.Warn("Accepted data download is canceled") succeeded, err := r.exclusiveUpdateDataDownload(ctx, dd, func(dataDownload *velerov2alpha1api.DataDownload) { dataDownload.Status.Phase = velerov2alpha1api.DataDownloadPhaseCanceled @@ -442,7 +454,10 @@ func (r *DataDownloadReconciler) TryCancelDataDownload(ctx context.Context, dd * dataDownload.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} } dataDownload.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()} - dataDownload.Status.Message = message + + if message != "" { + dataDownload.Status.Message = message + } }) if err != nil { @@ -456,7 +471,6 @@ func (r *DataDownloadReconciler) TryCancelDataDownload(ctx context.Context, dd * // success update r.metrics.RegisterDataDownloadCancel(r.nodeName) r.restoreExposer.CleanUp(ctx, getDataDownloadOwnerObject(dd)) - r.closeDataPath(ctx, dd.Name) } func (r *DataDownloadReconciler) OnDataDownloadProgress(ctx context.Context, namespace string, ddName string, progress *uploader.Progress) { diff --git a/pkg/controller/data_download_controller_test.go b/pkg/controller/data_download_controller_test.go index c6c2697f7a..385870426f 100644 --- a/pkg/controller/data_download_controller_test.go +++ b/pkg/controller/data_download_controller_test.go @@ -192,6 +192,10 @@ func TestDataDownloadReconcile(t *testing.T) { isFSBRRestoreErr bool notNilExpose bool notMockCleanUp bool + mockInit bool + mockInitErr error + mockStart bool + mockStartErr error mockCancel bool mockClose bool expected *velerov2alpha1api.DataDownload @@ -264,13 +268,36 @@ func TestDataDownloadReconcile(t *testing.T) { expectedResult: &ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, }, { - name: "Unable to update status to in progress for data download", + name: "data path init error", dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(), targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), - needErrs: []bool{false, false, false, true}, + mockInit: true, + mockInitErr: errors.New("fake-data-path-init-error"), + mockClose: true, notNilExpose: true, - notMockCleanUp: true, - expectedStatusMsg: "Patch error", + expectedStatusMsg: "error initializing asyncBR: fake-data-path-init-error", + }, + { + name: "Unable to update status to in progress for data download", + dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(), + targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), + needErrs: []bool{false, false, false, true}, + mockInit: true, + mockClose: true, + notNilExpose: true, + notMockCleanUp: true, + expectedResult: &ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, + }, + { + name: "data path start error", + dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(), + targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), + mockInit: true, + mockStart: true, + mockStartErr: errors.New("fake-data-path-start-error"), + mockClose: true, + notNilExpose: true, + expectedStatusMsg: "error starting async restore for pod test-name, volume test-pvc: fake-data-path-start-error", }, { name: "accept DataDownload error", @@ -399,6 +426,14 @@ func TestDataDownloadReconcile(t *testing.T) { datapath.MicroServiceBRWatcherCreator = func(kbclient.Client, kubernetes.Interface, manager.Manager, string, string, string, string, string, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR { asyncBR := datapathmockes.NewAsyncBR(t) + if test.mockInit { + asyncBR.On("Init", mock.Anything, mock.Anything).Return(test.mockInitErr) + } + + if test.mockStart { + asyncBR.On("StartRestore", mock.Anything, mock.Anything, mock.Anything).Return(test.mockStartErr) + } + if test.mockCancel { asyncBR.On("Cancel").Return() } @@ -488,6 +523,10 @@ func TestDataDownloadReconcile(t *testing.T) { assert.True(t, true, apierrors.IsNotFound(err)) } + if !test.needCreateFSBR { + assert.Nil(t, r.dataPathMgr.GetAsyncBR(test.dd.Name)) + } + t.Logf("%s: \n %v \n", test.name, dd) }) } @@ -845,7 +884,7 @@ func TestTryCancelDataDownload(t *testing.T) { err = r.client.Create(ctx, test.dd) require.NoError(t, err) - r.TryCancelDataDownload(ctx, test.dd, "") + r.tryCancelAcceptedDataDownload(ctx, test.dd, "") if test.expectedErr == "" { assert.NoError(t, err) diff --git a/pkg/controller/data_upload_controller.go b/pkg/controller/data_upload_controller.go index 91413a8ccf..b10f1f6367 100644 --- a/pkg/controller/data_upload_controller.go +++ b/pkg/controller/data_upload_controller.go @@ -230,9 +230,9 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) // we don't want to update CR into cancel status forcely as it may conflict with CR update in Expose action // we could retry when the CR requeue in periodcally log.Debugf("Data upload is been canceled %s in Phase %s", du.GetName(), du.Status.Phase) - r.TryCancelDataUpload(ctx, du, "") + r.tryCancelAcceptedDataUpload(ctx, du, "") } else if peekErr := ep.PeekExposed(ctx, getOwnerObject(du)); peekErr != nil { - r.TryCancelDataUpload(ctx, du, fmt.Sprintf("found a dataupload %s/%s with expose error: %s. mark it as cancel", du.Namespace, du.Name, peekErr)) + r.tryCancelAcceptedDataUpload(ctx, du, fmt.Sprintf("found a dataupload %s/%s with expose error: %s. mark it as cancel", du.Namespace, du.Name, peekErr)) log.Errorf("Cancel du %s/%s because of expose error %s", du.Namespace, du.Name, peekErr) } else if du.Status.StartTimestamp != nil { if time.Since(du.Status.StartTimestamp.Time) >= r.preparingTimeout { @@ -283,21 +283,35 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) return r.errorOut(ctx, du, err, "error to create data path", log) } } + + if err := r.initCancelableDataPath(ctx, asyncBR, res, log); err != nil { + log.WithError(err).Warnf("Failed to init cancelable data path for %s, will close and retry", du.Name) + + r.closeDataPath(ctx, du.Name) + return r.errorOut(ctx, du, err, "error initializing data path", log) + } + // Update status to InProgress original := du.DeepCopy() du.Status.Phase = velerov2alpha1api.DataUploadPhaseInProgress du.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} if err := r.client.Patch(ctx, du, client.MergeFrom(original)); err != nil { - return r.errorOut(ctx, du, err, "error updating dataupload status", log) + log.WithError(err).Warnf("Failed to update dataupload %s to InProgress, will data path close and retry", du.Name) + + r.closeDataPath(ctx, du.Name) + return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil } log.Info("Data upload is marked as in progress") - result, err := r.runCancelableDataUpload(ctx, asyncBR, du, res, log) - if err != nil { - log.Errorf("Failed to run cancelable data path for %s with err %v", du.Name, err) + + if err := r.startCancelableDataPath(asyncBR, du, res, log); err != nil { + log.WithError(err).Errorf("Failed to start cancelable data path for %s", du.Name) r.closeDataPath(ctx, du.Name) + + return r.errorOut(ctx, du, err, "error starting data path", log) } - return result, err + + return ctrl.Result{}, nil } else if du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress { log.Info("Data upload is in progress") if du.Spec.Cancel { @@ -340,29 +354,33 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) } } -func (r *DataUploadReconciler) runCancelableDataUpload(ctx context.Context, asyncBR datapath.AsyncBR, du *velerov2alpha1api.DataUpload, res *exposer.ExposeResult, log logrus.FieldLogger) (reconcile.Result, error) { - log.Info("Run cancelable dataUpload") +func (r *DataUploadReconciler) initCancelableDataPath(ctx context.Context, asyncBR datapath.AsyncBR, res *exposer.ExposeResult, log logrus.FieldLogger) error { + log.Info("Init cancelable dataUpload") if err := asyncBR.Init(ctx, nil); err != nil { - return r.errorOut(ctx, du, err, "error to initialize asyncBR", log) + return errors.Wrap(err, "error initializing asyncBR") } log.Infof("async backup init for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName) + return nil +} + +func (r *DataUploadReconciler) startCancelableDataPath(asyncBR datapath.AsyncBR, du *velerov2alpha1api.DataUpload, res *exposer.ExposeResult, log logrus.FieldLogger) error { + log.Info("Start cancelable dataUpload") + if err := asyncBR.StartBackup(datapath.AccessPoint{ ByPath: res.ByPod.VolumeName, }, du.Spec.DataMoverConfig, nil); err != nil { - return r.errorOut(ctx, du, err, fmt.Sprintf("error starting async backup for pod %s, volume %s", res.ByPod.HostingPod, res.ByPod.VolumeName), log) + return errors.Wrapf(err, "error starting async backup for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName) } - log.Infof("Async backup started for pod %s, volume %s", res.ByPod.HostingPod, res.ByPod.VolumeName) - return ctrl.Result{}, nil + log.Info("Async backup started for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName) + return nil } func (r *DataUploadReconciler) OnDataUploadCompleted(ctx context.Context, namespace string, duName string, result datapath.Result) { - defer func() { - go r.closeDataPath(ctx, duName) - }() + defer r.dataPathMgr.RemoveAsyncBR(duName) log := r.logger.WithField("dataupload", duName) @@ -406,9 +424,7 @@ func (r *DataUploadReconciler) OnDataUploadCompleted(ctx context.Context, namesp } func (r *DataUploadReconciler) OnDataUploadFailed(ctx context.Context, namespace, duName string, err error) { - defer func() { - go r.closeDataPath(ctx, duName) - }() + defer r.dataPathMgr.RemoveAsyncBR(duName) log := r.logger.WithField("dataupload", duName) @@ -418,16 +434,12 @@ func (r *DataUploadReconciler) OnDataUploadFailed(ctx context.Context, namespace if getErr := r.client.Get(ctx, types.NamespacedName{Name: duName, Namespace: namespace}, &du); getErr != nil { log.WithError(getErr).Warn("Failed to get dataupload on failure") } else { - if _, errOut := r.errorOut(ctx, &du, err, "data path backup failed", log); err != nil { - log.WithError(err).Warnf("Failed to patch dataupload with err %v", errOut) - } + r.errorOut(ctx, &du, err, "data path backup failed", log) } } func (r *DataUploadReconciler) OnDataUploadCancelled(ctx context.Context, namespace string, duName string) { - defer func() { - go r.closeDataPath(ctx, duName) - }() + defer r.dataPathMgr.RemoveAsyncBR(duName) log := r.logger.WithField("dataupload", duName) @@ -453,17 +465,19 @@ func (r *DataUploadReconciler) OnDataUploadCancelled(ctx context.Context, namesp } } -// TryCancelDataUpload clear up resources only when update success -func (r *DataUploadReconciler) TryCancelDataUpload(ctx context.Context, du *velerov2alpha1api.DataUpload, message string) { +func (r *DataUploadReconciler) tryCancelAcceptedDataUpload(ctx context.Context, du *velerov2alpha1api.DataUpload, message string) { log := r.logger.WithField("dataupload", du.Name) - log.Warn("Async fs backup data path canceled") + log.Warn("Accepted data upload is canceled") succeeded, err := r.exclusiveUpdateDataUpload(ctx, du, func(dataUpload *velerov2alpha1api.DataUpload) { dataUpload.Status.Phase = velerov2alpha1api.DataUploadPhaseCanceled if dataUpload.Status.StartTimestamp.IsZero() { dataUpload.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} } dataUpload.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()} - dataUpload.Status.Message = message + + if message != "" { + dataUpload.Status.Message = message + } }) if err != nil { @@ -478,7 +492,6 @@ func (r *DataUploadReconciler) TryCancelDataUpload(ctx context.Context, du *vele r.metrics.RegisterDataUploadCancel(r.nodeName) // cleans up any objects generated during the snapshot expose r.cleanUp(ctx, du, log) - r.closeDataPath(ctx, du.Name) } func (r *DataUploadReconciler) cleanUp(ctx context.Context, du *velerov2alpha1api.DataUpload, log *logrus.Entry) { @@ -692,7 +705,7 @@ func (r *DataUploadReconciler) errorOut(ctx context.Context, du *velerov2alpha1a } se.CleanUp(ctx, getOwnerObject(du), volumeSnapshotName, du.Spec.SourceNamespace) } else { - err = errors.Wrapf(err, "failed to clean up exposed snapshot with could not find %s snapshot exposer", du.Spec.SnapshotType) + log.Warnf("failed to clean up exposed snapshot could not find %s snapshot exposer", du.Spec.SnapshotType) } return ctrl.Result{}, r.updateStatusToFailed(ctx, du, err, msg, log) diff --git a/pkg/controller/data_upload_controller_test.go b/pkg/controller/data_upload_controller_test.go index ee73372b1b..ea7603b904 100644 --- a/pkg/controller/data_upload_controller_test.go +++ b/pkg/controller/data_upload_controller_test.go @@ -306,20 +306,16 @@ type fakeDataUploadFSBR struct { du *velerov2alpha1api.DataUpload kubeClient kbclient.Client clock clock.WithTickerAndDelayedExecution + initErr error + startErr error } func (f *fakeDataUploadFSBR) Init(ctx context.Context, param interface{}) error { - return nil + return f.initErr } func (f *fakeDataUploadFSBR) StartBackup(source datapath.AccessPoint, uploaderConfigs map[string]string, param interface{}) error { - du := f.du - original := f.du.DeepCopy() - du.Status.Phase = velerov2alpha1api.DataUploadPhaseCompleted - du.Status.CompletionTimestamp = &metav1.Time{Time: f.clock.Now()} - f.kubeClient.Patch(context.Background(), du, kbclient.MergeFrom(original)) - - return nil + return f.startErr } func (f *fakeDataUploadFSBR) StartRestore(snapshotID string, target datapath.AccessPoint, uploaderConfigs map[string]string) error { @@ -348,27 +344,24 @@ func TestReconcile(t *testing.T) { needErrs []bool peekErr error notCreateFSBR bool + fsBRInitErr error + fsBRStartErr error }{ { - name: "Dataupload is not initialized", - du: builder.ForDataUpload("unknown-ns", "unknown-name").Result(), - expectedProcessed: false, - expected: nil, - expectedRequeue: ctrl.Result{}, + name: "Dataupload is not initialized", + du: builder.ForDataUpload("unknown-ns", "unknown-name").Result(), + expectedRequeue: ctrl.Result{}, }, { - name: "Error get Dataupload", - du: builder.ForDataUpload(velerov1api.DefaultNamespace, "unknown-name").Result(), - expectedProcessed: false, - expected: nil, - expectedRequeue: ctrl.Result{}, - expectedErrMsg: "getting DataUpload: Get error", - needErrs: []bool{true, false, false, false}, + name: "Error get Dataupload", + du: builder.ForDataUpload(velerov1api.DefaultNamespace, "unknown-name").Result(), + expectedRequeue: ctrl.Result{}, + expectedErrMsg: "getting DataUpload: Get error", + needErrs: []bool{true, false, false, false}, }, { - name: "Unsupported data mover type", - du: dataUploadBuilder().DataMover("unknown type").Result(), - expectedProcessed: false, - expected: dataUploadBuilder().Phase("").Result(), - expectedRequeue: ctrl.Result{}, + name: "Unsupported data mover type", + du: dataUploadBuilder().DataMover("unknown type").Result(), + expected: dataUploadBuilder().Phase("").Result(), + expectedRequeue: ctrl.Result{}, }, { name: "Unknown type of snapshot exposer is not initialized", du: dataUploadBuilder().SnapshotType("unknown type").Result(), @@ -377,13 +370,12 @@ func TestReconcile(t *testing.T) { expectedRequeue: ctrl.Result{}, expectedErrMsg: "unknown type type of snapshot exposer is not exist", }, { - name: "Dataupload should be accepted", - du: dataUploadBuilder().Result(), - pod: builder.ForPod("fake-ns", dataUploadName).Volumes(&corev1.Volume{Name: "test-pvc"}).Result(), - pvc: builder.ForPersistentVolumeClaim("fake-ns", "test-pvc").Result(), - expectedProcessed: false, - expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(), - expectedRequeue: ctrl.Result{}, + name: "Dataupload should be accepted", + du: dataUploadBuilder().Result(), + pod: builder.ForPod("fake-ns", dataUploadName).Volumes(&corev1.Volume{Name: "test-pvc"}).Result(), + pvc: builder.ForPersistentVolumeClaim("fake-ns", "test-pvc").Result(), + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(), + expectedRequeue: ctrl.Result{}, }, { name: "Dataupload should fail to get PVC information", @@ -395,34 +387,31 @@ func TestReconcile(t *testing.T) { expectedErrMsg: "failed to get PVC", }, { - name: "Dataupload should be prepared", - du: dataUploadBuilder().SnapshotType(fakeSnapshotType).Result(), - expectedProcessed: false, - expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(), - expectedRequeue: ctrl.Result{}, - }, { - name: "Dataupload prepared should be completed", - pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), - du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(), - expectedProcessed: true, - expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseCompleted).Result(), - expectedRequeue: ctrl.Result{}, + name: "Dataupload should be prepared", + du: dataUploadBuilder().SnapshotType(fakeSnapshotType).Result(), + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(), + expectedRequeue: ctrl.Result{}, }, { - name: "Dataupload with not enabled cancel", - pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), - du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).SnapshotType(fakeSnapshotType).Cancel(false).Result(), - expectedProcessed: false, - expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(), - expectedRequeue: ctrl.Result{}, + name: "Dataupload prepared should be completed", + pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(), + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(), + expectedRequeue: ctrl.Result{}, }, { - name: "Dataupload should be cancel", - pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), - du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).SnapshotType(fakeSnapshotType).Cancel(true).Result(), - expectedProcessed: false, - expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseCanceling).Result(), - expectedRequeue: ctrl.Result{}, + name: "Dataupload with not enabled cancel", + pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).SnapshotType(fakeSnapshotType).Cancel(false).Result(), + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(), + expectedRequeue: ctrl.Result{}, + }, + { + name: "Dataupload should be cancel", + pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).SnapshotType(fakeSnapshotType).Cancel(true).Result(), + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseCanceling).Result(), + expectedRequeue: ctrl.Result{}, }, { name: "Dataupload should be cancel with match node", @@ -445,19 +434,43 @@ func TestReconcile(t *testing.T) { du.Status.Node = "different_node" return du }(), - expectedProcessed: false, - expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(), - expectedRequeue: ctrl.Result{}, - notCreateFSBR: true, + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(), + expectedRequeue: ctrl.Result{}, + notCreateFSBR: true, + }, + { + name: "runCancelableDataUpload is concurrent limited", + dataMgr: datapath.NewManager(0), + pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(), + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(), + expectedRequeue: ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, }, { - name: "runCancelableDataUpload is concurrent limited", - dataMgr: datapath.NewManager(0), + name: "data path init error", pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(), - expectedProcessed: false, - expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(), - expectedRequeue: ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, + fsBRInitErr: errors.New("fake-data-path-init-error"), + expectedProcessed: true, + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).SnapshotType(fakeSnapshotType).Result(), + expectedErrMsg: "error initializing asyncBR: fake-data-path-init-error", + }, + { + name: "Unable to update status to in progress for data download", + pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(), + needErrs: []bool{false, false, false, true}, + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(), + expectedRequeue: ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, + }, + { + name: "data path start error", + pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(), + fsBRStartErr: errors.New("fake-data-path-start-error"), + expectedProcessed: true, + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).SnapshotType(fakeSnapshotType).Result(), + expectedErrMsg: "error starting async backup for pod dataupload-1, volume dataupload-1: fake-data-path-start-error", }, { name: "prepare timeout", @@ -480,7 +493,6 @@ func TestReconcile(t *testing.T) { du.DeletionTimestamp = &metav1.Time{Time: time.Now()} return du }(), - expectedProcessed: false, checkFunc: func(du velerov2alpha1api.DataUpload) bool { return du.Spec.Cancel }, @@ -496,7 +508,6 @@ func TestReconcile(t *testing.T) { du.DeletionTimestamp = &metav1.Time{Time: time.Now()} return du }(), - expectedProcessed: false, checkFunc: func(du velerov2alpha1api.DataUpload) bool { return !controllerutil.ContainsFinalizer(&du, DataUploadDownloadFinalizer) }, @@ -555,12 +566,16 @@ func TestReconcile(t *testing.T) { du: test.du, kubeClient: r.client, clock: r.Clock, + initErr: test.fsBRInitErr, + startErr: test.fsBRStartErr, } } } + testCreateFsBR := false if test.du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress && !test.notCreateFSBR { if fsBR := r.dataPathMgr.GetAsyncBR(test.du.Name); fsBR == nil { + testCreateFsBR = true _, err := r.dataPathMgr.CreateMicroServiceBRWatcher(ctx, r.client, nil, nil, datapath.TaskTypeBackup, test.du.Name, velerov1api.DefaultNamespace, "", "", "", datapath.Callbacks{OnCancelled: r.OnDataUploadCancelled}, false, velerotest.NewLogger()) require.NoError(t, err) } @@ -605,6 +620,11 @@ func TestReconcile(t *testing.T) { if test.checkFunc != nil { assert.True(t, test.checkFunc(du)) } + + if !testCreateFsBR && du.Status.Phase != velerov2alpha1api.DataUploadPhaseInProgress { + assert.Nil(t, r.dataPathMgr.GetAsyncBR(test.du.Name)) + } + }) } } @@ -926,7 +946,7 @@ func TestTryCancelDataUpload(t *testing.T) { err = r.client.Create(ctx, test.dd) require.NoError(t, err) - r.TryCancelDataUpload(ctx, test.dd, "") + r.tryCancelAcceptedDataUpload(ctx, test.dd, "") if test.expectedErr == "" { assert.NoError(t, err) diff --git a/pkg/datamover/backup_micro_service.go b/pkg/datamover/backup_micro_service.go index cedacc0ce4..fa48b3523e 100644 --- a/pkg/datamover/backup_micro_service.go +++ b/pkg/datamover/backup_micro_service.go @@ -127,15 +127,13 @@ func (r *BackupMicroService) Init() error { return err } -var waitControllerTimeout time.Duration = time.Minute * 2 - func (r *BackupMicroService) RunCancelableDataPath(ctx context.Context) (string, error) { log := r.logger.WithFields(logrus.Fields{ "dataupload": r.dataUploadName, }) du := &velerov2alpha1api.DataUpload{} - err := wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, waitControllerTimeout, true, func(ctx context.Context) (bool, error) { + err := wait.PollUntilContextCancel(ctx, 500*time.Millisecond, true, func(ctx context.Context) (bool, error) { err := r.client.Get(ctx, types.NamespacedName{ Namespace: r.namespace, Name: r.dataUploadName, @@ -241,8 +239,6 @@ func (r *BackupMicroService) Shutdown() { var funcMarshal = json.Marshal func (r *BackupMicroService) OnDataUploadCompleted(ctx context.Context, namespace string, duName string, result datapath.Result) { - defer r.closeDataPath(ctx, duName) - log := r.logger.WithField("dataupload", duName) backupBytes, err := funcMarshal(result.Backup) @@ -262,8 +258,6 @@ func (r *BackupMicroService) OnDataUploadCompleted(ctx context.Context, namespac } func (r *BackupMicroService) OnDataUploadFailed(ctx context.Context, namespace string, duName string, err error) { - defer r.closeDataPath(ctx, duName) - log := r.logger.WithField("dataupload", duName) log.WithError(err).Error("Async fs backup data path failed") @@ -274,8 +268,6 @@ func (r *BackupMicroService) OnDataUploadFailed(ctx context.Context, namespace s } func (r *BackupMicroService) OnDataUploadCancelled(ctx context.Context, namespace string, duName string) { - defer r.closeDataPath(ctx, duName) - log := r.logger.WithField("dataupload", duName) log.Warn("Async fs backup data path canceled") @@ -296,8 +288,6 @@ func (r *BackupMicroService) OnDataUploadProgress(ctx context.Context, namespace return } - log.Infof("Sending event for progress %v (%s)", progress, string(progressBytes)) - r.eventRecorder.Event(r.dataUpload, false, datapath.EventReasonProgress, string(progressBytes)) } @@ -313,7 +303,7 @@ func (r *BackupMicroService) closeDataPath(ctx context.Context, duName string) { func (r *BackupMicroService) cancelDataUpload(du *velerov2alpha1api.DataUpload) { r.logger.WithField("DataUpload", du.Name).Info("Data upload is being canceled") - r.eventRecorder.Event(du, false, "Canceling", "Canceling for data upload %s", du.Name) + r.eventRecorder.Event(du, false, datapath.EventReasonCancelling, "Canceling for data upload %s", du.Name) fsBackup := r.dataPathMgr.GetAsyncBR(du.Name) if fsBackup == nil { diff --git a/pkg/datamover/backup_micro_service_test.go b/pkg/datamover/backup_micro_service_test.go index 9db38d4c0d..cca428ee88 100644 --- a/pkg/datamover/backup_micro_service_test.go +++ b/pkg/datamover/backup_micro_service_test.go @@ -301,12 +301,12 @@ func TestRunCancelableDataPath(t *testing.T) { }{ { name: "no du", - ctx: context.Background(), + ctx: ctxTimeout, expectedErr: "error waiting for du: context deadline exceeded", }, { name: "du not in in-progress", - ctx: context.Background(), + ctx: ctxTimeout, kubeClientObj: []runtime.Object{du}, expectedErr: "error waiting for du: context deadline exceeded", }, @@ -412,8 +412,6 @@ func TestRunCancelableDataPath(t *testing.T) { return fsBR } - waitControllerTimeout = time.Second - if test.result != nil { go func() { time.Sleep(time.Millisecond * 500) diff --git a/pkg/datamover/restore_micro_service.go b/pkg/datamover/restore_micro_service.go index 508469701a..d0a4c6f50c 100644 --- a/pkg/datamover/restore_micro_service.go +++ b/pkg/datamover/restore_micro_service.go @@ -122,7 +122,7 @@ func (r *RestoreMicroService) RunCancelableDataPath(ctx context.Context) (string }) dd := &velerov2alpha1api.DataDownload{} - err := wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, waitControllerTimeout, true, func(ctx context.Context) (bool, error) { + err := wait.PollUntilContextCancel(ctx, 500*time.Millisecond, true, func(ctx context.Context) (bool, error) { err := r.client.Get(ctx, types.NamespacedName{ Namespace: r.namespace, Name: r.dataDownloadName, @@ -214,8 +214,6 @@ func (r *RestoreMicroService) Shutdown() { } func (r *RestoreMicroService) OnDataDownloadCompleted(ctx context.Context, namespace string, ddName string, result datapath.Result) { - defer r.closeDataPath(ctx, ddName) - log := r.logger.WithField("datadownload", ddName) restoreBytes, err := funcMarshal(result.Restore) @@ -235,8 +233,6 @@ func (r *RestoreMicroService) OnDataDownloadCompleted(ctx context.Context, names } func (r *RestoreMicroService) OnDataDownloadFailed(ctx context.Context, namespace string, ddName string, err error) { - defer r.closeDataPath(ctx, ddName) - log := r.logger.WithField("datadownload", ddName) log.WithError(err).Error("Async fs restore data path failed") @@ -247,8 +243,6 @@ func (r *RestoreMicroService) OnDataDownloadFailed(ctx context.Context, namespac } func (r *RestoreMicroService) OnDataDownloadCancelled(ctx context.Context, namespace string, ddName string) { - defer r.closeDataPath(ctx, ddName) - log := r.logger.WithField("datadownload", ddName) log.Warn("Async fs restore data path canceled") @@ -284,7 +278,7 @@ func (r *RestoreMicroService) closeDataPath(ctx context.Context, ddName string) func (r *RestoreMicroService) cancelDataDownload(dd *velerov2alpha1api.DataDownload) { r.logger.WithField("DataDownload", dd.Name).Info("Data download is being canceled") - r.eventRecorder.Event(dd, false, "Canceling", "Canceling for data download %s", dd.Name) + r.eventRecorder.Event(dd, false, datapath.EventReasonCancelling, "Canceling for data download %s", dd.Name) fsBackup := r.dataPathMgr.GetAsyncBR(dd.Name) if fsBackup == nil { diff --git a/pkg/datamover/restore_micro_service_test.go b/pkg/datamover/restore_micro_service_test.go index e3ef8701da..8a3ed61e1f 100644 --- a/pkg/datamover/restore_micro_service_test.go +++ b/pkg/datamover/restore_micro_service_test.go @@ -254,12 +254,12 @@ func TestRunCancelableRestore(t *testing.T) { }{ { name: "no dd", - ctx: context.Background(), + ctx: ctxTimeout, expectedErr: "error waiting for dd: context deadline exceeded", }, { name: "dd not in in-progress", - ctx: context.Background(), + ctx: ctxTimeout, kubeClientObj: []runtime.Object{dd}, expectedErr: "error waiting for dd: context deadline exceeded", }, @@ -365,8 +365,6 @@ func TestRunCancelableRestore(t *testing.T) { return fsBR } - waitControllerTimeout = time.Second - if test.result != nil { go func() { time.Sleep(time.Millisecond * 500) diff --git a/pkg/datapath/file_system.go b/pkg/datapath/file_system.go index 762c91b188..5d3b54f281 100644 --- a/pkg/datapath/file_system.go +++ b/pkg/datapath/file_system.go @@ -18,6 +18,7 @@ package datapath import ( "context" + "sync" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -66,6 +67,8 @@ type fileSystemBR struct { callbacks Callbacks jobName string requestorType string + wgDataPath sync.WaitGroup + dataPathLock sync.Mutex } func newFileSystemBR(jobName string, requestorType string, client client.Client, namespace string, callbacks Callbacks, log logrus.FieldLogger) AsyncBR { @@ -75,6 +78,7 @@ func newFileSystemBR(jobName string, requestorType string, client client.Client, client: client, namespace: namespace, callbacks: callbacks, + wgDataPath: sync.WaitGroup{}, log: log, } @@ -134,6 +138,23 @@ func (fs *fileSystemBR) Init(ctx context.Context, param interface{}) error { } func (fs *fileSystemBR) Close(ctx context.Context) { + if fs.cancel != nil { + fs.cancel() + } + + fs.log.WithField("user", fs.jobName).Info("Closing FileSystemBR") + + fs.wgDataPath.Wait() + + fs.close(ctx) + + fs.log.WithField("user", fs.jobName).Info("FileSystemBR is closed") +} + +func (fs *fileSystemBR) close(ctx context.Context) { + fs.dataPathLock.Lock() + defer fs.dataPathLock.Unlock() + if fs.uploaderProv != nil { if err := fs.uploaderProv.Close(ctx); err != nil { fs.log.Errorf("failed to close uploader provider with error %v", err) @@ -141,13 +162,6 @@ func (fs *fileSystemBR) Close(ctx context.Context) { fs.uploaderProv = nil } - - if fs.cancel != nil { - fs.cancel() - fs.cancel = nil - } - - fs.log.WithField("user", fs.jobName).Info("FileSystemBR is closed") } func (fs *fileSystemBR) StartBackup(source AccessPoint, uploaderConfig map[string]string, param interface{}) error { @@ -155,9 +169,18 @@ func (fs *fileSystemBR) StartBackup(source AccessPoint, uploaderConfig map[strin return errors.New("file system data path is not initialized") } + fs.wgDataPath.Add(1) + backupParam := param.(*FSBRStartParam) go func() { + fs.log.Info("Start data path backup") + + defer func() { + fs.close(context.Background()) + fs.wgDataPath.Done() + }() + snapshotID, emptySnapshot, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, backupParam.RealSource, backupParam.Tags, backupParam.ForceFull, backupParam.ParentSnapshot, source.VolMode, uploaderConfig, fs) @@ -182,7 +205,16 @@ func (fs *fileSystemBR) StartRestore(snapshotID string, target AccessPoint, uplo return errors.New("file system data path is not initialized") } + fs.wgDataPath.Add(1) + go func() { + fs.log.Info("Start data path restore") + + defer func() { + fs.close(context.Background()) + fs.wgDataPath.Done() + }() + err := fs.uploaderProv.RunRestore(fs.ctx, snapshotID, target.ByPath, target.VolMode, uploaderConfigs, fs) if err == provider.ErrorCanceled { diff --git a/pkg/datapath/file_system_test.go b/pkg/datapath/file_system_test.go index 85c6df08d9..fab33df1c0 100644 --- a/pkg/datapath/file_system_test.go +++ b/pkg/datapath/file_system_test.go @@ -96,6 +96,7 @@ func TestAsyncBackup(t *testing.T) { fs := newFileSystemBR("job-1", "test", nil, "velero", Callbacks{}, velerotest.NewLogger()).(*fileSystemBR) mockProvider := providerMock.NewProvider(t) mockProvider.On("RunBackup", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(test.result.Backup.SnapshotID, test.result.Backup.EmptySnapshot, test.err) + mockProvider.On("Close", mock.Anything).Return(nil) fs.uploaderProv = mockProvider fs.initialized = true fs.callbacks = test.callbacks @@ -179,6 +180,7 @@ func TestAsyncRestore(t *testing.T) { fs := newFileSystemBR("job-1", "test", nil, "velero", Callbacks{}, velerotest.NewLogger()).(*fileSystemBR) mockProvider := providerMock.NewProvider(t) mockProvider.On("RunRestore", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(test.err) + mockProvider.On("Close", mock.Anything).Return(nil) fs.uploaderProv = mockProvider fs.initialized = true fs.callbacks = test.callbacks diff --git a/pkg/datapath/micro_service_watcher.go b/pkg/datapath/micro_service_watcher.go index a5826459a4..8a129bde8f 100644 --- a/pkg/datapath/micro_service_watcher.go +++ b/pkg/datapath/micro_service_watcher.go @@ -46,11 +46,12 @@ const ( ErrCancelled = "data path is canceled" - EventReasonStarted = "Data-Path-Started" - EventReasonCompleted = "Data-Path-Completed" - EventReasonFailed = "Data-Path-Failed" - EventReasonCancelled = "Data-Path-Canceled" - EventReasonProgress = "Data-Path-Progress" + EventReasonStarted = "Data-Path-Started" + EventReasonCompleted = "Data-Path-Completed" + EventReasonFailed = "Data-Path-Failed" + EventReasonCancelled = "Data-Path-Canceled" + EventReasonProgress = "Data-Path-Progress" + EventReasonCancelling = "Data-Path-Canceling" ) type microServiceBRWatcher struct { @@ -76,6 +77,7 @@ type microServiceBRWatcher struct { podInformer ctrlcache.Informer eventHandler cache.ResourceEventHandlerRegistration podHandler cache.ResourceEventHandlerRegistration + watcherLock sync.Mutex } func newMicroServiceBRWatcher(client client.Client, kubeClient kubernetes.Interface, mgr manager.Manager, taskType string, taskName string, namespace string, @@ -121,8 +123,6 @@ func (ms *microServiceBRWatcher) Init(ctx context.Context, param interface{}) er return } - ms.log.Infof("Pushed adding event %s/%s, message %s for object %v", evt.Namespace, evt.Name, evt.Message, evt.InvolvedObject) - ms.eventCh <- evt }, UpdateFunc: func(_, obj interface{}) { @@ -131,8 +131,6 @@ func (ms *microServiceBRWatcher) Init(ctx context.Context, param interface{}) er return } - ms.log.Infof("Pushed updating event %s/%s, message %s for object %v", evt.Namespace, evt.Name, evt.Message, evt.InvolvedObject) - ms.eventCh <- evt }, }, @@ -177,12 +175,9 @@ func (ms *microServiceBRWatcher) Init(ctx context.Context, param interface{}) er } }() - ms.log.WithFields( - logrus.Fields{ - "taskType": ms.taskType, - "taskName": ms.taskName, - "thisPod": ms.thisPod, - }).Info("MicroServiceBR is initialized") + if err := ms.reEnsureThisPod(ctx); err != nil { + return err + } ms.eventInformer = eventInformer ms.podInformer = podInformer @@ -191,42 +186,56 @@ func (ms *microServiceBRWatcher) Init(ctx context.Context, param interface{}) er ms.ctx, ms.cancel = context.WithCancel(ctx) + ms.log.WithFields( + logrus.Fields{ + "taskType": ms.taskType, + "taskName": ms.taskName, + "thisPod": ms.thisPod, + }).Info("MicroServiceBR is initialized") + succeeded = true return nil + } func (ms *microServiceBRWatcher) Close(ctx context.Context) { if ms.cancel != nil { ms.cancel() - ms.cancel = nil } ms.log.WithField("taskType", ms.taskType).WithField("taskName", ms.taskName).Info("Closing MicroServiceBR") ms.wgWatcher.Wait() - if ms.eventInformer != nil && ms.eventHandler != nil { + ms.close() + + ms.log.WithField("taskType", ms.taskType).WithField("taskName", ms.taskName).Info("MicroServiceBR is closed") +} + +func (ms *microServiceBRWatcher) close() { + ms.watcherLock.Lock() + defer ms.watcherLock.Unlock() + + if ms.eventHandler != nil { if err := ms.eventInformer.RemoveEventHandler(ms.eventHandler); err != nil { ms.log.WithError(err).Warn("Failed to remove event handler") } + + ms.eventHandler = nil } - if ms.podInformer != nil && ms.podHandler != nil { + if ms.podHandler != nil { if err := ms.podInformer.RemoveEventHandler(ms.podHandler); err != nil { ms.log.WithError(err).Warn("Failed to remove pod handler") } - } - ms.log.WithField("taskType", ms.taskType).WithField("taskName", ms.taskName).Info("MicroServiceBR is closed") + ms.podHandler = nil + } } func (ms *microServiceBRWatcher) StartBackup(source AccessPoint, uploaderConfig map[string]string, param interface{}) error { - ms.log.Infof("Start watching backup ms for source %v", source) - - if err := ms.reEnsureThisPod(); err != nil { - return err - } + ms.log.Infof("Start watching backup ms for source %v", source.ByPath) ms.startWatch() @@ -234,20 +243,16 @@ func (ms *microServiceBRWatcher) StartBackup(source AccessPoint, uploaderConfig } func (ms *microServiceBRWatcher) StartRestore(snapshotID string, target AccessPoint, uploaderConfigs map[string]string) error { - ms.log.Infof("Start watching restore ms to target %v, from snapshot %s", target, snapshotID) - - if err := ms.reEnsureThisPod(); err != nil { - return err - } + ms.log.Infof("Start watching restore ms to target %s, from snapshot %s", target.ByPath, snapshotID) ms.startWatch() return nil } -func (ms *microServiceBRWatcher) reEnsureThisPod() error { +func (ms *microServiceBRWatcher) reEnsureThisPod(ctx context.Context) error { thisPod := &v1.Pod{} - if err := ms.client.Get(ms.ctx, types.NamespacedName{ + if err := ms.client.Get(ctx, types.NamespacedName{ Namespace: ms.namespace, Name: ms.thisPod, }, thisPod); err != nil { @@ -275,6 +280,11 @@ func (ms *microServiceBRWatcher) startWatch() { go func() { ms.log.Info("Start watching data path pod") + defer func() { + ms.close() + ms.wgWatcher.Done() + }() + var lastPod *v1.Pod watchLoop: @@ -291,14 +301,16 @@ func (ms *microServiceBRWatcher) startWatch() { } if lastPod == nil { - ms.log.Warn("Data path pod watch loop is canceled") - ms.wgWatcher.Done() + ms.log.Warn("Watch loop is cancelled on waiting data path pod") return } epilogLoop: for !ms.startedFromEvent || !ms.terminatedFromEvent { select { + case <-ms.ctx.Done(): + ms.log.Warn("Watch loop is cancelled on waiting final event") + return case <-time.After(eventWaitTimeout): break epilogLoop case evt := <-ms.eventCh: @@ -339,8 +351,6 @@ func (ms *microServiceBRWatcher) startWatch() { } logger.Info("Complete callback on data path pod termination") - - ms.wgWatcher.Done() }() } @@ -348,20 +358,22 @@ func (ms *microServiceBRWatcher) onEvent(evt *v1.Event) { switch evt.Reason { case EventReasonStarted: ms.startedFromEvent = true - ms.log.Infof("Received data path start message %s", evt.Message) + ms.log.Infof("Received data path start message: %s", evt.Message) case EventReasonProgress: ms.callbacks.OnProgress(ms.ctx, ms.namespace, ms.taskName, funcGetProgressFromMessage(evt.Message, ms.log)) case EventReasonCompleted: - ms.log.Infof("Received data path completed message %v", funcGetResultFromMessage(ms.taskType, evt.Message, ms.log)) + ms.log.Infof("Received data path completed message: %v", funcGetResultFromMessage(ms.taskType, evt.Message, ms.log)) ms.terminatedFromEvent = true case EventReasonCancelled: - ms.log.Infof("Received data path canceled message %s", evt.Message) + ms.log.Infof("Received data path canceled message: %s", evt.Message) ms.terminatedFromEvent = true case EventReasonFailed: - ms.log.Infof("Received data path failed message %s", evt.Message) + ms.log.Infof("Received data path failed message: %s", evt.Message) ms.terminatedFromEvent = true + case EventReasonCancelling: + ms.log.Infof("Received data path canceling message: %s", evt.Message) default: - ms.log.Debugf("Received event for data mover %s.[reason %s, message %s]", ms.taskName, evt.Reason, evt.Message) + ms.log.Infof("Received event for data path %s,reason: %s, message: %s", ms.taskName, evt.Reason, evt.Message) } } diff --git a/pkg/datapath/micro_service_watcher_test.go b/pkg/datapath/micro_service_watcher_test.go index f10f6b3310..f926363e07 100644 --- a/pkg/datapath/micro_service_watcher_test.go +++ b/pkg/datapath/micro_service_watcher_test.go @@ -102,7 +102,7 @@ func TestReEnsureThisPod(t *testing.T) { log: velerotest.NewLogger(), } - err := ms.reEnsureThisPod() + err := ms.reEnsureThisPod(context.Background()) if test.expectErr != "" { assert.EqualError(t, err, test.expectErr) } else { From ed0ef67c16dbd119365527e2581a6af505b555f1 Mon Sep 17 00:00:00 2001 From: Lyndon-Li Date: Wed, 14 Aug 2024 13:16:35 +0800 Subject: [PATCH 2/5] data mover ms smoke testing Signed-off-by: Lyndon-Li --- pkg/cmd/cli/datamover/backup.go | 1 + pkg/cmd/cli/datamover/restore.go | 3 +- pkg/cmd/cli/nodeagent/server.go | 6 +- pkg/controller/data_download_controller.go | 58 ++++++++----------- .../data_download_controller_test.go | 13 +---- pkg/controller/data_upload_controller.go | 24 +++----- pkg/controller/data_upload_controller_test.go | 21 +------ .../pod_volume_backup_controller.go | 6 +- .../pod_volume_restore_controller.go | 6 +- pkg/datapath/micro_service_watcher.go | 29 +--------- 10 files changed, 50 insertions(+), 117 deletions(-) diff --git a/pkg/cmd/cli/datamover/backup.go b/pkg/cmd/cli/datamover/backup.go index ca600faab5..4d704b04c1 100644 --- a/pkg/cmd/cli/datamover/backup.go +++ b/pkg/cmd/cli/datamover/backup.go @@ -224,6 +224,7 @@ func (s *dataMoverBackup) runDataPath() { err = dpService.Init() if err != nil { + dpService.Shutdown() s.cancelFunc() funcExitWithMessage(s.logger, false, "Failed to init data path service for DataUpload %s: %v", s.config.duName, err) return diff --git a/pkg/cmd/cli/datamover/restore.go b/pkg/cmd/cli/datamover/restore.go index fc74a64f15..244060cc9a 100644 --- a/pkg/cmd/cli/datamover/restore.go +++ b/pkg/cmd/cli/datamover/restore.go @@ -215,6 +215,7 @@ func (s *dataMoverRestore) runDataPath() { err = dpService.Init() if err != nil { + dpService.Shutdown() s.cancelFunc() funcExitWithMessage(s.logger, false, "Failed to init data path service for DataDownload %s: %v", s.config.ddName, err) return @@ -222,8 +223,8 @@ func (s *dataMoverRestore) runDataPath() { result, err := dpService.RunCancelableDataPath(s.ctx) if err != nil { - s.cancelFunc() dpService.Shutdown() + s.cancelFunc() funcExitWithMessage(s.logger, false, "Failed to run data path service for DataDownload %s: %v", s.config.ddName, err) return } diff --git a/pkg/cmd/cli/nodeagent/server.go b/pkg/cmd/cli/nodeagent/server.go index 181afbf690..f30c8df4aa 100644 --- a/pkg/cmd/cli/nodeagent/server.go +++ b/pkg/cmd/cli/nodeagent/server.go @@ -292,12 +292,12 @@ func (s *nodeAgentServer) run() { if s.dataPathConfigs != nil && len(s.dataPathConfigs.LoadAffinity) > 0 { loadAffinity = s.dataPathConfigs.LoadAffinity[0] } - dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.csiSnapshotClient.SnapshotV1(), s.dataPathMgr, loadAffinity, repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger, s.metrics) + dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.csiSnapshotClient.SnapshotV1(), s.dataPathMgr, loadAffinity, clock.RealClock{}, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics) if err = dataUploadReconciler.SetupWithManager(s.mgr); err != nil { s.logger.WithError(err).Fatal("Unable to create the data upload controller") } - dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, repoEnsurer, credentialGetter, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics) + dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics) if err = dataDownloadReconciler.SetupWithManager(s.mgr); err != nil { s.logger.WithError(err).Fatal("Unable to create the data download controller") } @@ -312,8 +312,6 @@ func (s *nodeAgentServer) run() { if err := dataDownloadReconciler.AttemptDataDownloadResume(s.ctx, s.mgr.GetClient(), s.logger.WithField("node", s.nodeName), s.namespace); err != nil { s.logger.WithError(errors.WithStack(err)).Error("failed to attempt data download resume") } - - s.logger.Info("Attempt complete to resume dataUploads and dataDownloads") }() s.logger.Info("Controllers starting...") diff --git a/pkg/controller/data_download_controller.go b/pkg/controller/data_download_controller.go index a161f60bd4..0b9805a0ac 100644 --- a/pkg/controller/data_download_controller.go +++ b/pkg/controller/data_download_controller.go @@ -39,7 +39,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" - "github.com/vmware-tanzu/velero/internal/credentials" "github.com/vmware-tanzu/velero/pkg/apis/velero/shared" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1" @@ -47,45 +46,37 @@ import ( "github.com/vmware-tanzu/velero/pkg/datapath" "github.com/vmware-tanzu/velero/pkg/exposer" "github.com/vmware-tanzu/velero/pkg/metrics" - repository "github.com/vmware-tanzu/velero/pkg/repository" "github.com/vmware-tanzu/velero/pkg/uploader" - "github.com/vmware-tanzu/velero/pkg/util/filesystem" "github.com/vmware-tanzu/velero/pkg/util/kube" ) // DataDownloadReconciler reconciles a DataDownload object type DataDownloadReconciler struct { - client client.Client - kubeClient kubernetes.Interface - mgr manager.Manager - logger logrus.FieldLogger - credentialGetter *credentials.CredentialGetter - fileSystem filesystem.Interface - Clock clock.WithTickerAndDelayedExecution - restoreExposer exposer.GenericRestoreExposer - nodeName string - repositoryEnsurer *repository.Ensurer - dataPathMgr *datapath.Manager - preparingTimeout time.Duration - metrics *metrics.ServerMetrics + client client.Client + kubeClient kubernetes.Interface + mgr manager.Manager + logger logrus.FieldLogger + Clock clock.WithTickerAndDelayedExecution + restoreExposer exposer.GenericRestoreExposer + nodeName string + dataPathMgr *datapath.Manager + preparingTimeout time.Duration + metrics *metrics.ServerMetrics } func NewDataDownloadReconciler(client client.Client, mgr manager.Manager, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager, - repoEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter, nodeName string, preparingTimeout time.Duration, logger logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataDownloadReconciler { + nodeName string, preparingTimeout time.Duration, logger logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataDownloadReconciler { return &DataDownloadReconciler{ - client: client, - kubeClient: kubeClient, - mgr: mgr, - logger: logger.WithField("controller", "DataDownload"), - credentialGetter: credentialGetter, - fileSystem: filesystem.NewFileSystem(), - Clock: &clock.RealClock{}, - nodeName: nodeName, - repositoryEnsurer: repoEnsurer, - restoreExposer: exposer.NewGenericRestoreExposer(kubeClient, logger), - dataPathMgr: dataPathMgr, - preparingTimeout: preparingTimeout, - metrics: metrics, + client: client, + kubeClient: kubeClient, + mgr: mgr, + logger: logger.WithField("controller", "DataDownload"), + Clock: &clock.RealClock{}, + nodeName: nodeName, + restoreExposer: exposer.NewGenericRestoreExposer(kubeClient, logger), + dataPathMgr: dataPathMgr, + preparingTimeout: preparingTimeout, + metrics: metrics, } } @@ -282,7 +273,7 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request } if err := r.initCancelableDataPath(ctx, asyncBR, result, log); err != nil { - log.WithError(err).Warnf("Failed to init cancelable data path for %s, will close and retry", dd.Name) + log.WithError(err).Errorf("Failed to init cancelable data path for %s", dd.Name) r.closeDataPath(ctx, dd.Name) return r.errorOut(ctx, dd, err, "error initializing data path", log) @@ -372,7 +363,7 @@ func (r *DataDownloadReconciler) startCancelableDataPath(asyncBR datapath.AsyncB return errors.Wrapf(err, "error starting async restore for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName) } - log.Info("Async restore started for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName) + log.Infof("Async restore started for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName) return nil } @@ -420,7 +411,7 @@ func (r *DataDownloadReconciler) OnDataDownloadFailed(ctx context.Context, names if getErr := r.client.Get(ctx, types.NamespacedName{Name: ddName, Namespace: namespace}, &dd); getErr != nil { log.WithError(getErr).Warn("Failed to get data download on failure") } else { - r.errorOut(ctx, &dd, err, "data path restore failed", log) + _, _ = r.errorOut(ctx, &dd, err, "data path restore failed", log) } } @@ -797,6 +788,7 @@ func (r *DataDownloadReconciler) AttemptDataDownloadResume(ctx context.Context, err := funcResumeCancellableDataRestore(r, ctx, dd, logger) if err == nil { + logger.WithField("dd", dd.Name).WithField("current node", r.nodeName).Info("Completed to resume in progress DD") continue } diff --git a/pkg/controller/data_download_controller_test.go b/pkg/controller/data_download_controller_test.go index d20db30f0c..bb9fe6f7c9 100644 --- a/pkg/controller/data_download_controller_test.go +++ b/pkg/controller/data_download_controller_test.go @@ -43,7 +43,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client/fake" - "github.com/vmware-tanzu/velero/internal/credentials" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1" "github.com/vmware-tanzu/velero/pkg/builder" @@ -139,19 +138,9 @@ func initDataDownloadReconcilerWithError(objects []runtime.Object, needError ... return nil, err } - credentialFileStore, err := credentials.NewNamespacedFileStore( - fakeClient, - velerov1api.DefaultNamespace, - "/tmp/credentials", - fakeFS, - ) - if err != nil { - return nil, err - } - dataPathMgr := datapath.NewManager(1) - return NewDataDownloadReconciler(fakeClient, nil, fakeKubeClient, dataPathMgr, nil, &credentials.CredentialGetter{FromFile: credentialFileStore}, "test-node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil + return NewDataDownloadReconciler(fakeClient, nil, fakeKubeClient, dataPathMgr, "test-node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil } func TestDataDownloadReconcile(t *testing.T) { diff --git a/pkg/controller/data_upload_controller.go b/pkg/controller/data_upload_controller.go index c6a15ceca8..fb9e75709d 100644 --- a/pkg/controller/data_upload_controller.go +++ b/pkg/controller/data_upload_controller.go @@ -41,7 +41,6 @@ import ( snapshotter "github.com/kubernetes-csi/external-snapshotter/client/v7/clientset/versioned/typed/volumesnapshot/v1" - "github.com/vmware-tanzu/velero/internal/credentials" "github.com/vmware-tanzu/velero/pkg/apis/velero/shared" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1" @@ -50,9 +49,7 @@ import ( "github.com/vmware-tanzu/velero/pkg/exposer" "github.com/vmware-tanzu/velero/pkg/metrics" "github.com/vmware-tanzu/velero/pkg/nodeagent" - "github.com/vmware-tanzu/velero/pkg/repository" "github.com/vmware-tanzu/velero/pkg/uploader" - "github.com/vmware-tanzu/velero/pkg/util/filesystem" "github.com/vmware-tanzu/velero/pkg/util/kube" ) @@ -69,11 +66,8 @@ type DataUploadReconciler struct { kubeClient kubernetes.Interface csiSnapshotClient snapshotter.SnapshotV1Interface mgr manager.Manager - repoEnsurer *repository.Ensurer Clock clocks.WithTickerAndDelayedExecution - credentialGetter *credentials.CredentialGetter nodeName string - fileSystem filesystem.Interface logger logrus.FieldLogger snapshotExposerList map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer dataPathMgr *datapath.Manager @@ -83,19 +77,16 @@ type DataUploadReconciler struct { } func NewDataUploadReconciler(client client.Client, mgr manager.Manager, kubeClient kubernetes.Interface, csiSnapshotClient snapshotter.SnapshotV1Interface, - dataPathMgr *datapath.Manager, loadAffinity *nodeagent.LoadAffinity, repoEnsurer *repository.Ensurer, clock clocks.WithTickerAndDelayedExecution, - cred *credentials.CredentialGetter, nodeName string, fs filesystem.Interface, preparingTimeout time.Duration, log logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataUploadReconciler { + dataPathMgr *datapath.Manager, loadAffinity *nodeagent.LoadAffinity, clock clocks.WithTickerAndDelayedExecution, nodeName string, preparingTimeout time.Duration, + log logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataUploadReconciler { return &DataUploadReconciler{ client: client, mgr: mgr, kubeClient: kubeClient, csiSnapshotClient: csiSnapshotClient, Clock: clock, - credentialGetter: cred, nodeName: nodeName, - fileSystem: fs, logger: log, - repoEnsurer: repoEnsurer, snapshotExposerList: map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer{velerov2alpha1api.SnapshotTypeCSI: exposer.NewCSISnapshotExposer(kubeClient, csiSnapshotClient, log)}, dataPathMgr: dataPathMgr, loadAffinity: loadAffinity, @@ -293,7 +284,7 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) } if err := r.initCancelableDataPath(ctx, asyncBR, res, log); err != nil { - log.WithError(err).Warnf("Failed to init cancelable data path for %s, will close and retry", du.Name) + log.WithError(err).Errorf("Failed to init cancelable data path for %s", du.Name) r.closeDataPath(ctx, du.Name) return r.errorOut(ctx, du, err, "error initializing data path", log) @@ -383,7 +374,7 @@ func (r *DataUploadReconciler) startCancelableDataPath(asyncBR datapath.AsyncBR, return errors.Wrapf(err, "error starting async backup for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName) } - log.Info("Async backup started for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName) + log.Infof("Async backup started for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName) return nil } @@ -442,7 +433,7 @@ func (r *DataUploadReconciler) OnDataUploadFailed(ctx context.Context, namespace if getErr := r.client.Get(ctx, types.NamespacedName{Name: duName, Namespace: namespace}, &du); getErr != nil { log.WithError(getErr).Warn("Failed to get dataupload on failure") } else { - r.errorOut(ctx, &du, err, "data path backup failed", log) + _, _ = r.errorOut(ctx, &du, err, "data path backup failed", log) } } @@ -502,7 +493,7 @@ func (r *DataUploadReconciler) tryCancelAcceptedDataUpload(ctx context.Context, r.cleanUp(ctx, du, log) } -func (r *DataUploadReconciler) cleanUp(ctx context.Context, du *velerov2alpha1api.DataUpload, log *logrus.Entry) { +func (r *DataUploadReconciler) cleanUp(ctx context.Context, du *velerov2alpha1api.DataUpload, log logrus.FieldLogger) { ep, ok := r.snapshotExposerList[du.Spec.SnapshotType] if !ok { log.WithError(fmt.Errorf("%v type of snapshot exposer is not exist", du.Spec.SnapshotType)). @@ -650,7 +641,7 @@ func (r *DataUploadReconciler) errorOut(ctx context.Context, du *velerov2alpha1a } se.CleanUp(ctx, getOwnerObject(du), volumeSnapshotName, du.Spec.SourceNamespace) } else { - log.Warnf("failed to clean up exposed snapshot could not find %s snapshot exposer", du.Spec.SnapshotType) + log.Errorf("failed to clean up exposed snapshot could not find %s snapshot exposer", du.Spec.SnapshotType) } return ctrl.Result{}, r.updateStatusToFailed(ctx, du, err, msg, log) @@ -894,6 +885,7 @@ func (r *DataUploadReconciler) AttemptDataUploadResume(ctx context.Context, cli err := funcResumeCancellableDataBackup(r, ctx, du, logger) if err == nil { + logger.WithField("du", du.Name).WithField("current node", r.nodeName).Info("Completed to resume in progress DU") continue } diff --git a/pkg/controller/data_upload_controller_test.go b/pkg/controller/data_upload_controller_test.go index 8df98b60dc..e4c98f1968 100644 --- a/pkg/controller/data_upload_controller_test.go +++ b/pkg/controller/data_upload_controller_test.go @@ -47,7 +47,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" - "github.com/vmware-tanzu/velero/internal/credentials" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1" "github.com/vmware-tanzu/velero/pkg/builder" @@ -229,24 +228,9 @@ func initDataUploaderReconcilerWithError(needError ...error) (*DataUploadReconci fakeSnapshotClient := snapshotFake.NewSimpleClientset(vsObject, vscObj) fakeKubeClient := clientgofake.NewSimpleClientset(daemonSet) - fakeFS := velerotest.NewFakeFileSystem() - pathGlob := fmt.Sprintf("/host_pods/%s/volumes/*/%s", "", dataUploadName) - _, err = fakeFS.Create(pathGlob) - if err != nil { - return nil, err - } - credentialFileStore, err := credentials.NewNamespacedFileStore( - fakeClient, - velerov1api.DefaultNamespace, - "/tmp/credentials", - fakeFS, - ) - if err != nil { - return nil, err - } - return NewDataUploadReconciler(fakeClient, nil, fakeKubeClient, fakeSnapshotClient.SnapshotV1(), dataPathMgr, nil, nil, - testclocks.NewFakeClock(now), &credentials.CredentialGetter{FromFile: credentialFileStore}, "test-node", fakeFS, time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil + return NewDataUploadReconciler(fakeClient, nil, fakeKubeClient, fakeSnapshotClient.SnapshotV1(), dataPathMgr, nil, + testclocks.NewFakeClock(now), "test-node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil } func dataUploadBuilder() *builder.DataUploadBuilder { @@ -626,7 +610,6 @@ func TestReconcile(t *testing.T) { if !testCreateFsBR && du.Status.Phase != velerov2alpha1api.DataUploadPhaseInProgress { assert.Nil(t, r.dataPathMgr.GetAsyncBR(test.du.Name)) } - }) } } diff --git a/pkg/controller/pod_volume_backup_controller.go b/pkg/controller/pod_volume_backup_controller.go index e626df2b35..548ab0d0c6 100644 --- a/pkg/controller/pod_volume_backup_controller.go +++ b/pkg/controller/pod_volume_backup_controller.go @@ -202,7 +202,7 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ } func (r *PodVolumeBackupReconciler) OnDataPathCompleted(ctx context.Context, namespace string, pvbName string, result datapath.Result) { - defer r.closeDataPath(ctx, pvbName) + defer r.dataPathMgr.RemoveAsyncBR(pvbName) log := r.logger.WithField("pvb", pvbName) @@ -240,7 +240,7 @@ func (r *PodVolumeBackupReconciler) OnDataPathCompleted(ctx context.Context, nam } func (r *PodVolumeBackupReconciler) OnDataPathFailed(ctx context.Context, namespace, pvbName string, err error) { - defer r.closeDataPath(ctx, pvbName) + defer r.dataPathMgr.RemoveAsyncBR(pvbName) log := r.logger.WithField("pvb", pvbName) @@ -255,7 +255,7 @@ func (r *PodVolumeBackupReconciler) OnDataPathFailed(ctx context.Context, namesp } func (r *PodVolumeBackupReconciler) OnDataPathCancelled(ctx context.Context, namespace string, pvbName string) { - defer r.closeDataPath(ctx, pvbName) + defer r.dataPathMgr.RemoveAsyncBR(pvbName) log := r.logger.WithField("pvb", pvbName) diff --git a/pkg/controller/pod_volume_restore_controller.go b/pkg/controller/pod_volume_restore_controller.go index 3f285789db..c4a3e7451f 100644 --- a/pkg/controller/pod_volume_restore_controller.go +++ b/pkg/controller/pod_volume_restore_controller.go @@ -265,7 +265,7 @@ func getInitContainerIndex(pod *corev1api.Pod) int { } func (c *PodVolumeRestoreReconciler) OnDataPathCompleted(ctx context.Context, namespace string, pvrName string, result datapath.Result) { - defer c.closeDataPath(ctx, pvrName) + defer c.dataPathMgr.RemoveAsyncBR(pvrName) log := c.logger.WithField("pvr", pvrName) @@ -325,7 +325,7 @@ func (c *PodVolumeRestoreReconciler) OnDataPathCompleted(ctx context.Context, na } func (c *PodVolumeRestoreReconciler) OnDataPathFailed(ctx context.Context, namespace string, pvrName string, err error) { - defer c.closeDataPath(ctx, pvrName) + defer c.dataPathMgr.RemoveAsyncBR(pvrName) log := c.logger.WithField("pvr", pvrName) @@ -340,7 +340,7 @@ func (c *PodVolumeRestoreReconciler) OnDataPathFailed(ctx context.Context, names } func (c *PodVolumeRestoreReconciler) OnDataPathCancelled(ctx context.Context, namespace string, pvrName string) { - defer c.closeDataPath(ctx, pvrName) + defer c.dataPathMgr.RemoveAsyncBR(pvrName) log := c.logger.WithField("pvr", pvrName) diff --git a/pkg/datapath/micro_service_watcher.go b/pkg/datapath/micro_service_watcher.go index 8a129bde8f..d74ca2fc2c 100644 --- a/pkg/datapath/micro_service_watcher.go +++ b/pkg/datapath/micro_service_watcher.go @@ -103,8 +103,6 @@ func newMicroServiceBRWatcher(client client.Client, kubeClient kubernetes.Interf } func (ms *microServiceBRWatcher) Init(ctx context.Context, param interface{}) error { - succeeded := false - eventInformer, err := ms.mgr.GetCache().GetInformer(ctx, &v1.Event{}) if err != nil { return errors.Wrap(err, "error getting event informer") @@ -135,19 +133,10 @@ func (ms *microServiceBRWatcher) Init(ctx context.Context, param interface{}) er }, }, ) - if err != nil { return errors.Wrap(err, "error registering event handler") } - defer func() { - if !succeeded { - if err := eventInformer.RemoveEventHandler(eventHandler); err != nil { - ms.log.WithError(err).Warn("Failed to remove event handler") - } - } - }() - podHandler, err := podInformer.AddEventHandler( cache.ResourceEventHandlerFuncs{ UpdateFunc: func(_, obj interface{}) { @@ -162,19 +151,10 @@ func (ms *microServiceBRWatcher) Init(ctx context.Context, param interface{}) er }, }, ) - if err != nil { return errors.Wrap(err, "error registering pod handler") } - defer func() { - if !succeeded { - if err := podInformer.RemoveEventHandler(podHandler); err != nil { - ms.log.WithError(err).Warn("Failed to remove pod handler") - } - } - }() - if err := ms.reEnsureThisPod(ctx); err != nil { return err } @@ -193,10 +173,7 @@ func (ms *microServiceBRWatcher) Init(ctx context.Context, param interface{}) er "thisPod": ms.thisPod, }).Info("MicroServiceBR is initialized") - succeeded = true - return nil - } func (ms *microServiceBRWatcher) Close(ctx context.Context) { @@ -301,7 +278,7 @@ func (ms *microServiceBRWatcher) startWatch() { } if lastPod == nil { - ms.log.Warn("Watch loop is cancelled on waiting data path pod") + ms.log.Warn("Watch loop is canceled on waiting data path pod") return } @@ -309,7 +286,7 @@ func (ms *microServiceBRWatcher) startWatch() { for !ms.startedFromEvent || !ms.terminatedFromEvent { select { case <-ms.ctx.Done(): - ms.log.Warn("Watch loop is cancelled on waiting final event") + ms.log.Warn("Watch loop is canceled on waiting final event") return case <-time.After(eventWaitTimeout): break epilogLoop @@ -373,7 +350,7 @@ func (ms *microServiceBRWatcher) onEvent(evt *v1.Event) { case EventReasonCancelling: ms.log.Infof("Received data path canceling message: %s", evt.Message) default: - ms.log.Infof("Received event for data path %s,reason: %s, message: %s", ms.taskName, evt.Reason, evt.Message) + ms.log.Infof("Received event for data path %s, reason: %s, message: %s", ms.taskName, evt.Reason, evt.Message) } } From 0ed1a7fc8699a4205e538d8cf65c9b24ac9dd2ad Mon Sep 17 00:00:00 2001 From: Lyndon-Li Date: Thu, 15 Aug 2024 15:06:31 +0800 Subject: [PATCH 3/5] data mover ms smoke testing Signed-off-by: Lyndon-Li --- pkg/cmd/cli/nodeagent/server.go | 38 ++++++++++++++++--- pkg/controller/data_download_controller.go | 8 ++-- .../data_download_controller_test.go | 2 +- pkg/controller/data_upload_controller.go | 8 ++-- pkg/controller/data_upload_controller_test.go | 2 +- 5 files changed, 43 insertions(+), 15 deletions(-) diff --git a/pkg/cmd/cli/nodeagent/server.go b/pkg/cmd/cli/nodeagent/server.go index 513eb89de1..8aa52b1c56 100644 --- a/pkg/cmd/cli/nodeagent/server.go +++ b/pkg/cmd/cli/nodeagent/server.go @@ -61,6 +61,8 @@ import ( "github.com/vmware-tanzu/velero/pkg/repository" "github.com/vmware-tanzu/velero/pkg/util/filesystem" "github.com/vmware-tanzu/velero/pkg/util/logging" + + cacheutil "k8s.io/client-go/tools/cache" ) var ( @@ -309,14 +311,17 @@ func (s *nodeAgentServer) run() { } go func() { - s.mgr.GetCache().WaitForCacheSync(s.ctx) + if err := s.waitCacheForResume(); err != nil { + s.logger.WithError(err).Error("Failed to wait cache for resume, will not resume DU/DD") + return + } - if err := dataUploadReconciler.AttemptDataUploadResume(s.ctx, s.mgr.GetClient(), s.logger.WithField("node", s.nodeName), s.namespace); err != nil { - s.logger.WithError(errors.WithStack(err)).Error("failed to attempt data upload resume") + if err := dataUploadReconciler.AttemptDataUploadResume(s.ctx, s.logger.WithField("node", s.nodeName), s.namespace); err != nil { + s.logger.WithError(errors.WithStack(err)).Error("Failed to attempt data upload resume") } - if err := dataDownloadReconciler.AttemptDataDownloadResume(s.ctx, s.mgr.GetClient(), s.logger.WithField("node", s.nodeName), s.namespace); err != nil { - s.logger.WithError(errors.WithStack(err)).Error("failed to attempt data download resume") + if err := dataDownloadReconciler.AttemptDataDownloadResume(s.ctx, s.logger.WithField("node", s.nodeName), s.namespace); err != nil { + s.logger.WithError(errors.WithStack(err)).Error("Failed to attempt data download resume") } }() @@ -327,6 +332,29 @@ func (s *nodeAgentServer) run() { } } +func (s *nodeAgentServer) waitCacheForResume() error { + podInformer, err := s.mgr.GetCache().GetInformer(s.ctx, &v1.Pod{}) + if err != nil { + return errors.Wrap(err, "error getting pod informer") + } + + duInformer, err := s.mgr.GetCache().GetInformer(s.ctx, &velerov2alpha1api.DataUpload{}) + if err != nil { + return errors.Wrap(err, "error getting du informer") + } + + ddInformer, err := s.mgr.GetCache().GetInformer(s.ctx, &velerov2alpha1api.DataDownload{}) + if err != nil { + return errors.Wrap(err, "error getting dd informer") + } + + if !cacheutil.WaitForCacheSync(s.ctx.Done(), podInformer.HasSynced, duInformer.HasSynced, ddInformer.HasSynced) { + return errors.New("error waiting informer synced") + } + + return nil +} + // validatePodVolumesHostPath validates that the pod volumes path contains a // directory for each Pod running on this node func (s *nodeAgentServer) validatePodVolumesHostPath(client kubernetes.Interface) error { diff --git a/pkg/controller/data_download_controller.go b/pkg/controller/data_download_controller.go index 0b9805a0ac..990d7455b4 100644 --- a/pkg/controller/data_download_controller.go +++ b/pkg/controller/data_download_controller.go @@ -767,9 +767,9 @@ func UpdateDataDownloadWithRetry(ctx context.Context, client client.Client, name var funcResumeCancellableDataRestore = (*DataDownloadReconciler).resumeCancellableDataPath -func (r *DataDownloadReconciler) AttemptDataDownloadResume(ctx context.Context, cli client.Client, logger *logrus.Entry, ns string) error { +func (r *DataDownloadReconciler) AttemptDataDownloadResume(ctx context.Context, logger *logrus.Entry, ns string) error { dataDownloads := &velerov2alpha1api.DataDownloadList{} - if err := cli.List(ctx, dataDownloads, &client.ListOptions{Namespace: ns}); err != nil { + if err := r.client.List(ctx, dataDownloads, &client.ListOptions{Namespace: ns}); err != nil { r.logger.WithError(errors.WithStack(err)).Error("failed to list datadownloads") return errors.Wrapf(err, "error to list datadownloads") } @@ -795,7 +795,7 @@ func (r *DataDownloadReconciler) AttemptDataDownloadResume(ctx context.Context, logger.WithField("datadownload", dd.GetName()).WithError(err).Warn("Failed to resume data path for dd, have to cancel it") resumeErr := err - err = UpdateDataDownloadWithRetry(ctx, cli, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, logger.WithField("datadownload", dd.Name), + err = UpdateDataDownloadWithRetry(ctx, r.client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, logger.WithField("datadownload", dd.Name), func(dataDownload *velerov2alpha1api.DataDownload) bool { if dataDownload.Spec.Cancel { return false @@ -812,7 +812,7 @@ func (r *DataDownloadReconciler) AttemptDataDownloadResume(ctx context.Context, } else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted { r.logger.WithField("datadownload", dd.GetName()).Warn("Cancel dd under Accepted phase") - err := UpdateDataDownloadWithRetry(ctx, cli, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, + err := UpdateDataDownloadWithRetry(ctx, r.client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, r.logger.WithField("datadownload", dd.Name), func(dataDownload *velerov2alpha1api.DataDownload) bool { if dataDownload.Spec.Cancel { return false diff --git a/pkg/controller/data_download_controller_test.go b/pkg/controller/data_download_controller_test.go index bb9fe6f7c9..a54135d034 100644 --- a/pkg/controller/data_download_controller_test.go +++ b/pkg/controller/data_download_controller_test.go @@ -1079,7 +1079,7 @@ func TestAttemptDataDownloadResume(t *testing.T) { funcResumeCancellableDataBackup = dt.resumeCancellableDataPath // Run the test - err = r.AttemptDataDownloadResume(ctx, r.client, r.logger.WithField("name", test.name), test.dd.Namespace) + err = r.AttemptDataDownloadResume(ctx, r.logger.WithField("name", test.name), test.dd.Namespace) if test.expectedError != "" { assert.EqualError(t, err, test.expectedError) diff --git a/pkg/controller/data_upload_controller.go b/pkg/controller/data_upload_controller.go index 4c26baf38c..22dc59bd53 100644 --- a/pkg/controller/data_upload_controller.go +++ b/pkg/controller/data_upload_controller.go @@ -867,9 +867,9 @@ func UpdateDataUploadWithRetry(ctx context.Context, client client.Client, namesp var funcResumeCancellableDataBackup = (*DataUploadReconciler).resumeCancellableDataPath -func (r *DataUploadReconciler) AttemptDataUploadResume(ctx context.Context, cli client.Client, logger *logrus.Entry, ns string) error { +func (r *DataUploadReconciler) AttemptDataUploadResume(ctx context.Context, logger *logrus.Entry, ns string) error { dataUploads := &velerov2alpha1api.DataUploadList{} - if err := cli.List(ctx, dataUploads, &client.ListOptions{Namespace: ns}); err != nil { + if err := r.client.List(ctx, dataUploads, &client.ListOptions{Namespace: ns}); err != nil { r.logger.WithError(errors.WithStack(err)).Error("failed to list datauploads") return errors.Wrapf(err, "error to list datauploads") } @@ -895,7 +895,7 @@ func (r *DataUploadReconciler) AttemptDataUploadResume(ctx context.Context, cli logger.WithField("dataupload", du.GetName()).WithError(err).Warn("Failed to resume data path for du, have to cancel it") resumeErr := err - err = UpdateDataUploadWithRetry(ctx, cli, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, logger.WithField("dataupload", du.Name), + err = UpdateDataUploadWithRetry(ctx, r.client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, logger.WithField("dataupload", du.Name), func(dataUpload *velerov2alpha1api.DataUpload) bool { if dataUpload.Spec.Cancel { return false @@ -912,7 +912,7 @@ func (r *DataUploadReconciler) AttemptDataUploadResume(ctx context.Context, cli } else if du.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted { r.logger.WithField("dataupload", du.GetName()).Warn("Cancel du under Accepted phase") - err := UpdateDataUploadWithRetry(ctx, cli, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, r.logger.WithField("dataupload", du.Name), + err := UpdateDataUploadWithRetry(ctx, r.client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, r.logger.WithField("dataupload", du.Name), func(dataUpload *velerov2alpha1api.DataUpload) bool { if dataUpload.Spec.Cancel { return false diff --git a/pkg/controller/data_upload_controller_test.go b/pkg/controller/data_upload_controller_test.go index b373a7a6af..a6ee25574d 100644 --- a/pkg/controller/data_upload_controller_test.go +++ b/pkg/controller/data_upload_controller_test.go @@ -1127,7 +1127,7 @@ func TestAttemptDataUploadResume(t *testing.T) { funcResumeCancellableDataBackup = dt.resumeCancellableDataPath // Run the test - err = r.AttemptDataUploadResume(ctx, r.client, r.logger.WithField("name", test.name), test.du.Namespace) + err = r.AttemptDataUploadResume(ctx, r.logger.WithField("name", test.name), test.du.Namespace) if test.expectedError != "" { assert.EqualError(t, err, test.expectedError) From d4e7d1472e82f283150627a74a145d6f6aea568d Mon Sep 17 00:00:00 2001 From: Shubham Pampattiwar Date: Thu, 15 Aug 2024 12:18:54 -0700 Subject: [PATCH 4/5] add docs for backup pvc config support Signed-off-by: Shubham Pampattiwar add changelog Signed-off-by: Shubham Pampattiwar add section to csi dm doc and minor fixes Signed-off-by: Shubham Pampattiwar configMap name is configurable Signed-off-by: Shubham Pampattiwar --- .../unreleased/8119-shubham-pampattiwar | 1 + .../docs/main/csi-snapshot-data-movement.md | 7 ++- .../data-movement-backup-pvc-configuration.md | 54 +++++++++++++++++++ site/data/docs/main-toc.yml | 4 +- 4 files changed, 64 insertions(+), 2 deletions(-) create mode 100644 changelogs/unreleased/8119-shubham-pampattiwar create mode 100644 site/content/docs/main/data-movement-backup-pvc-configuration.md diff --git a/changelogs/unreleased/8119-shubham-pampattiwar b/changelogs/unreleased/8119-shubham-pampattiwar new file mode 100644 index 0000000000..48b4c0b091 --- /dev/null +++ b/changelogs/unreleased/8119-shubham-pampattiwar @@ -0,0 +1 @@ +Add docs for backup pvc config support diff --git a/site/content/docs/main/csi-snapshot-data-movement.md b/site/content/docs/main/csi-snapshot-data-movement.md index 60d72aecf9..31419e3ee2 100644 --- a/site/content/docs/main/csi-snapshot-data-movement.md +++ b/site/content/docs/main/csi-snapshot-data-movement.md @@ -491,8 +491,11 @@ For Velero built-in data mover, it uses Kubernetes' scheduler to mount a snapsho For the backup, you can intervene this scheduling process through [Data Movement Backup Node Selection][15], so that you can decide which node(s) should/should not run the data movement backup for various purposes. For the restore, this is not supported because sometimes the data movement restore must run in the same node where the restored workload pod is scheduled. +### BackupPVC Configuration - +The `BackupPVC` serves as an intermediate Persistent Volume Claim (PVC) utilized during data movement backup operations, providing efficient access to data. +In complex storage environments, optimizing `BackupPVC` configurations can significantly enhance the performance of backup operations. [This document][16] outlines +advanced configuration options for `BackupPVC`, allowing users to fine-tune access modes and storage class settings based on their storage provider's capabilities. [1]: https://github.com/vmware-tanzu/velero/pull/5968 [2]: csi.md @@ -509,3 +512,5 @@ For the restore, this is not supported because sometimes the data movement resto [13]: https://kubernetes.io/docs/concepts/workloads/pods/pod-qos/ [14]: node-agent-concurrency.md [15]: data-movement-backup-node-selection.md +[16]: data-movement-backup-pvc-configuration.md + diff --git a/site/content/docs/main/data-movement-backup-pvc-configuration.md b/site/content/docs/main/data-movement-backup-pvc-configuration.md new file mode 100644 index 0000000000..61bbf53e14 --- /dev/null +++ b/site/content/docs/main/data-movement-backup-pvc-configuration.md @@ -0,0 +1,54 @@ +--- +title: "BackupPVC Configuration for Data Movement Backup" +layout: docs +--- + +`BackupPVC` is an intermediate PVC to access data from during the data movement backup operation. + +In some scenarios users may need to configure some advanced options of the backupPVC so that the data movement backup +operation could perform better. Specifically: +- For some storage providers, when creating a read-only volume from a snapshot, it is very fast; whereas, if a writable volume + is created from the snapshot, they need to clone the entire disk data, which is time consuming. If the `backupPVC`'s `accessModes` is + set as `ReadOnlyMany`, the volume driver is able to tell the storage to create a read-only volume, which may dramatically shorten the + snapshot expose time. On the other hand, `ReadOnlyMany` is not supported by all volumes. Therefore, users should be allowed to configure + the `accessModes` for the `backupPVC`. +- Some storage providers create one or more replicas when creating a volume, the number of replicas is defined in the storage class. + However, it doesn't make any sense to keep replicas when an intermediate volume used by the backup. Therefore, users should be allowed + to configure another storage class specifically used by the `backupPVC`. + +Velero introduces a new section in the node agent configuration configMap (the name of this configMap is passed using `--node-agent-config` velero server argument) +called `backupPVC`, through which you can specify the following +configurations: + +- `storageClass`: This specifies the storage class to be used for the backupPVC. If this value does not exist or is empty then by +default the source PVC's storage class will be used. + +- `readOnly`: This is a boolean value. If set to `true` then `ReadOnlyMany` will be the only value set to the backupPVC's access modes. Otherwise +`ReadWriteOnce` value will be used. + +A sample of `backupPVC` config as part of the configMap would look like: +```json +{ + "backupPVC": { + "storage-class-1": { + "storageClass": "backupPVC-storage-class", + "readOnly": true + }, + "storage-class-2": { + "storageClass": "backupPVC-storage-class" + }, + "storage-class-3": { + "readOnly": true + } + } +} +``` + +**Note:** +- Users should make sure that the storage class specified in `backupPVC` config should exist in the cluster and can be used by the +`backupPVC`, otherwise the corresponding DataUpload CR will stay in `Accepted` phase until timeout (data movement prepare timeout value is 30m by default). +- If the users are setting `readOnly` value as `true` in the `backupPVC` config then they must also make sure that the storage class that is being used for +`backupPVC` should support creation of `ReadOnlyMany` PVC from a snapshot, otherwise the corresponding DataUpload CR will stay in `Accepted` phase until +timeout (data movement prepare timeout value is 30m by default). +- If any of the above problems occur, then the DataUpload CR is `canceled` after timeout, and the backupPod and backupPVC will be deleted, and the backup +will be marked as `PartiallyFailed`. diff --git a/site/data/docs/main-toc.yml b/site/data/docs/main-toc.yml index cab5468237..ec5f046043 100644 --- a/site/data/docs/main-toc.yml +++ b/site/data/docs/main-toc.yml @@ -52,7 +52,9 @@ toc: - page: CSI Snapshot Data Movement url: /csi-snapshot-data-movement - page: Node-agent Concurrency - url: /node-agent-concurrency + url: /node-agent-concurrency + - page: Backup PVC Configuration + url: /data-movement-backup-pvc-configuration - page: Verifying Self-signed Certificates url: /self-signed-certificates - page: Changing RBAC permissions From af62dd4b3e868f8fd9d2d0b6093fefa50e50d396 Mon Sep 17 00:00:00 2001 From: Xun Jiang Date: Tue, 20 Aug 2024 10:49:56 +0800 Subject: [PATCH 5/5] Modify E2E and perf test result output directory. Add LongTime label to more E2E cases. Signed-off-by: Xun Jiang --- changelogs/unreleased/8129-blackpiglet | 1 + test/Makefile | 4 ++-- test/e2e/e2e_suite_test.go | 6 +++--- 3 files changed, 6 insertions(+), 5 deletions(-) create mode 100644 changelogs/unreleased/8129-blackpiglet diff --git a/changelogs/unreleased/8129-blackpiglet b/changelogs/unreleased/8129-blackpiglet new file mode 100644 index 0000000000..c776b66eb9 --- /dev/null +++ b/changelogs/unreleased/8129-blackpiglet @@ -0,0 +1 @@ +Modify E2E and perf test report generated directory \ No newline at end of file diff --git a/test/Makefile b/test/Makefile index ff62230b32..fce0b9d8ee 100644 --- a/test/Makefile +++ b/test/Makefile @@ -167,7 +167,7 @@ run-e2e: ginkgo (echo "Cloud provider for target cloud/plugin provider is required, please rerun with CLOUD_PROVIDER="; exit 1) @$(GINKGO) run \ -v \ - --junit-report report.xml \ + --junit-report e2e/report.xml \ --label-filter="$(GINKGO_LABELS)" \ --timeout=5h \ ./e2e \ @@ -207,7 +207,7 @@ run-perf: ginkgo (echo "Cloud provider for target cloud/plugin provider is required, please rerun with CLOUD_PROVIDER="; exit 1) @$(GINKGO) run \ -v \ - --junit-report report.xml \ + --junit-report perf/report.xml \ --label-filter="$(GINKGO_LABELS)" \ --timeout=5h \ ./perf \ diff --git a/test/e2e/e2e_suite_test.go b/test/e2e/e2e_suite_test.go index 0d22510bc1..1031958084 100644 --- a/test/e2e/e2e_suite_test.go +++ b/test/e2e/e2e_suite_test.go @@ -113,7 +113,7 @@ func init() { // cases can be executed as expected successful result. var _ = Describe("Velero tests with various CRD API group versions", - Label("APIGroup", "APIVersion", "SKIP_KIND"), APIGroupVersionsTest) + Label("APIGroup", "APIVersion", "SKIP_KIND", "LongTime"), APIGroupVersionsTest) var _ = Describe("CRD of apiextentions v1beta1 should be B/R successfully from cluster(k8s version < 1.22) to cluster(k8s version >= 1.22)", Label("APIGroup", "APIExtensions", "SKIP_KIND"), APIExtensionsVersionsTest) @@ -197,9 +197,9 @@ var _ = Describe("Backups in object storage are synced to a new Velero and delet var _ = Describe("Backup will be created periodically by schedule defined by a Cron expression", Label("Schedule", "BR", "Pause", "LongTime"), ScheduleBackupTest) var _ = Describe("Backup resources should follow the specific order in schedule", - Label("Schedule", "OrderedResources"), ScheduleOrderedResources) + Label("Schedule", "OrderedResources", "LongTime"), ScheduleOrderedResources) var _ = Describe("Schedule controller wouldn't create a new backup when it still has pending or InProgress backup", - Label("Schedule", "BackupCreation", "SKIP_KIND"), ScheduleBackupCreationTest) + Label("Schedule", "BackupCreation", "SKIP_KIND", "LongTime"), ScheduleBackupCreationTest) var _ = Describe("Velero test on ssr object when controller namespace mix-ups", Label("PrivilegesMgmt", "SSR"), SSRTest)