diff --git a/changelogs/unreleased/8119-shubham-pampattiwar b/changelogs/unreleased/8119-shubham-pampattiwar new file mode 100644 index 0000000000..48b4c0b091 --- /dev/null +++ b/changelogs/unreleased/8119-shubham-pampattiwar @@ -0,0 +1 @@ +Add docs for backup pvc config support diff --git a/changelogs/unreleased/8129-blackpiglet b/changelogs/unreleased/8129-blackpiglet new file mode 100644 index 0000000000..c776b66eb9 --- /dev/null +++ b/changelogs/unreleased/8129-blackpiglet @@ -0,0 +1 @@ +Modify E2E and perf test report generated directory \ No newline at end of file diff --git a/pkg/cmd/cli/datamover/backup.go b/pkg/cmd/cli/datamover/backup.go index 35f483d921..4d704b04c1 100644 --- a/pkg/cmd/cli/datamover/backup.go +++ b/pkg/cmd/cli/datamover/backup.go @@ -224,6 +224,7 @@ func (s *dataMoverBackup) runDataPath() { err = dpService.Init() if err != nil { + dpService.Shutdown() s.cancelFunc() funcExitWithMessage(s.logger, false, "Failed to init data path service for DataUpload %s: %v", s.config.duName, err) return @@ -233,6 +234,7 @@ func (s *dataMoverBackup) runDataPath() { result, err := dpService.RunCancelableDataPath(s.ctx) if err != nil { + dpService.Shutdown() s.cancelFunc() funcExitWithMessage(s.logger, false, "Failed to run data path service for DataUpload %s: %v", s.config.duName, err) return diff --git a/pkg/cmd/cli/datamover/restore.go b/pkg/cmd/cli/datamover/restore.go index fb46abd409..244060cc9a 100644 --- a/pkg/cmd/cli/datamover/restore.go +++ b/pkg/cmd/cli/datamover/restore.go @@ -215,6 +215,7 @@ func (s *dataMoverRestore) runDataPath() { err = dpService.Init() if err != nil { + dpService.Shutdown() s.cancelFunc() funcExitWithMessage(s.logger, false, "Failed to init data path service for DataDownload %s: %v", s.config.ddName, err) return @@ -222,6 +223,7 @@ func (s *dataMoverRestore) runDataPath() { result, err := dpService.RunCancelableDataPath(s.ctx) if err != nil { + dpService.Shutdown() s.cancelFunc() funcExitWithMessage(s.logger, false, "Failed to run data path service for DataDownload %s: %v", s.config.ddName, err) return diff --git a/pkg/cmd/cli/nodeagent/server.go b/pkg/cmd/cli/nodeagent/server.go index 4a10634d42..20a4654fb6 100644 --- a/pkg/cmd/cli/nodeagent/server.go +++ b/pkg/cmd/cli/nodeagent/server.go @@ -61,6 +61,8 @@ import ( "github.com/vmware-tanzu/velero/pkg/repository" "github.com/vmware-tanzu/velero/pkg/util/filesystem" "github.com/vmware-tanzu/velero/pkg/util/logging" + + cacheutil "k8s.io/client-go/tools/cache" ) var ( @@ -300,28 +302,29 @@ func (s *nodeAgentServer) run() { backupPVCConfig = s.dataPathConfigs.BackupPVCConfig } - dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.csiSnapshotClient.SnapshotV1(), s.dataPathMgr, loadAffinity, backupPVCConfig, repoEnsurer, clock.RealClock{}, credentialGetter, s.nodeName, s.fileSystem, s.config.dataMoverPrepareTimeout, s.logger, s.metrics) + dataUploadReconciler := controller.NewDataUploadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.csiSnapshotClient.SnapshotV1(), s.dataPathMgr, loadAffinity, backupPVCConfig, clock.RealClock{}, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics) if err = dataUploadReconciler.SetupWithManager(s.mgr); err != nil { s.logger.WithError(err).Fatal("Unable to create the data upload controller") } - dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, repoEnsurer, credentialGetter, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics) + dataDownloadReconciler := controller.NewDataDownloadReconciler(s.mgr.GetClient(), s.mgr, s.kubeClient, s.dataPathMgr, s.nodeName, s.config.dataMoverPrepareTimeout, s.logger, s.metrics) if err = dataDownloadReconciler.SetupWithManager(s.mgr); err != nil { s.logger.WithError(err).Fatal("Unable to create the data download controller") } go func() { - s.mgr.GetCache().WaitForCacheSync(s.ctx) - - if err := dataUploadReconciler.AttemptDataUploadResume(s.ctx, s.mgr.GetClient(), s.logger.WithField("node", s.nodeName), s.namespace); err != nil { - s.logger.WithError(errors.WithStack(err)).Error("failed to attempt data upload resume") + if err := s.waitCacheForResume(); err != nil { + s.logger.WithError(err).Error("Failed to wait cache for resume, will not resume DU/DD") + return } - if err := dataDownloadReconciler.AttemptDataDownloadResume(s.ctx, s.mgr.GetClient(), s.logger.WithField("node", s.nodeName), s.namespace); err != nil { - s.logger.WithError(errors.WithStack(err)).Error("failed to attempt data download resume") + if err := dataUploadReconciler.AttemptDataUploadResume(s.ctx, s.logger.WithField("node", s.nodeName), s.namespace); err != nil { + s.logger.WithError(errors.WithStack(err)).Error("Failed to attempt data upload resume") } - s.logger.Info("Attempt complete to resume dataUploads and dataDownloads") + if err := dataDownloadReconciler.AttemptDataDownloadResume(s.ctx, s.logger.WithField("node", s.nodeName), s.namespace); err != nil { + s.logger.WithError(errors.WithStack(err)).Error("Failed to attempt data download resume") + } }() s.logger.Info("Controllers starting...") @@ -331,6 +334,29 @@ func (s *nodeAgentServer) run() { } } +func (s *nodeAgentServer) waitCacheForResume() error { + podInformer, err := s.mgr.GetCache().GetInformer(s.ctx, &v1.Pod{}) + if err != nil { + return errors.Wrap(err, "error getting pod informer") + } + + duInformer, err := s.mgr.GetCache().GetInformer(s.ctx, &velerov2alpha1api.DataUpload{}) + if err != nil { + return errors.Wrap(err, "error getting du informer") + } + + ddInformer, err := s.mgr.GetCache().GetInformer(s.ctx, &velerov2alpha1api.DataDownload{}) + if err != nil { + return errors.Wrap(err, "error getting dd informer") + } + + if !cacheutil.WaitForCacheSync(s.ctx.Done(), podInformer.HasSynced, duInformer.HasSynced, ddInformer.HasSynced) { + return errors.New("error waiting informer synced") + } + + return nil +} + // validatePodVolumesHostPath validates that the pod volumes path contains a // directory for each Pod running on this node func (s *nodeAgentServer) validatePodVolumesHostPath(client kubernetes.Interface) error { diff --git a/pkg/controller/data_download_controller.go b/pkg/controller/data_download_controller.go index ba5ad37539..990d7455b4 100644 --- a/pkg/controller/data_download_controller.go +++ b/pkg/controller/data_download_controller.go @@ -39,7 +39,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" - "github.com/vmware-tanzu/velero/internal/credentials" "github.com/vmware-tanzu/velero/pkg/apis/velero/shared" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1" @@ -47,45 +46,37 @@ import ( "github.com/vmware-tanzu/velero/pkg/datapath" "github.com/vmware-tanzu/velero/pkg/exposer" "github.com/vmware-tanzu/velero/pkg/metrics" - repository "github.com/vmware-tanzu/velero/pkg/repository" "github.com/vmware-tanzu/velero/pkg/uploader" - "github.com/vmware-tanzu/velero/pkg/util/filesystem" "github.com/vmware-tanzu/velero/pkg/util/kube" ) // DataDownloadReconciler reconciles a DataDownload object type DataDownloadReconciler struct { - client client.Client - kubeClient kubernetes.Interface - mgr manager.Manager - logger logrus.FieldLogger - credentialGetter *credentials.CredentialGetter - fileSystem filesystem.Interface - Clock clock.WithTickerAndDelayedExecution - restoreExposer exposer.GenericRestoreExposer - nodeName string - repositoryEnsurer *repository.Ensurer - dataPathMgr *datapath.Manager - preparingTimeout time.Duration - metrics *metrics.ServerMetrics + client client.Client + kubeClient kubernetes.Interface + mgr manager.Manager + logger logrus.FieldLogger + Clock clock.WithTickerAndDelayedExecution + restoreExposer exposer.GenericRestoreExposer + nodeName string + dataPathMgr *datapath.Manager + preparingTimeout time.Duration + metrics *metrics.ServerMetrics } func NewDataDownloadReconciler(client client.Client, mgr manager.Manager, kubeClient kubernetes.Interface, dataPathMgr *datapath.Manager, - repoEnsurer *repository.Ensurer, credentialGetter *credentials.CredentialGetter, nodeName string, preparingTimeout time.Duration, logger logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataDownloadReconciler { + nodeName string, preparingTimeout time.Duration, logger logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataDownloadReconciler { return &DataDownloadReconciler{ - client: client, - kubeClient: kubeClient, - mgr: mgr, - logger: logger.WithField("controller", "DataDownload"), - credentialGetter: credentialGetter, - fileSystem: filesystem.NewFileSystem(), - Clock: &clock.RealClock{}, - nodeName: nodeName, - repositoryEnsurer: repoEnsurer, - restoreExposer: exposer.NewGenericRestoreExposer(kubeClient, logger), - dataPathMgr: dataPathMgr, - preparingTimeout: preparingTimeout, - metrics: metrics, + client: client, + kubeClient: kubeClient, + mgr: mgr, + logger: logger.WithField("controller", "DataDownload"), + Clock: &clock.RealClock{}, + nodeName: nodeName, + restoreExposer: exposer.NewGenericRestoreExposer(kubeClient, logger), + dataPathMgr: dataPathMgr, + preparingTimeout: preparingTimeout, + metrics: metrics, } } @@ -225,9 +216,9 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request } else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted { if dd.Spec.Cancel { log.Debugf("Data download is been canceled %s in Phase %s", dd.GetName(), dd.Status.Phase) - r.TryCancelDataDownload(ctx, dd, "") + r.tryCancelAcceptedDataDownload(ctx, dd, "") } else if peekErr := r.restoreExposer.PeekExposed(ctx, getDataDownloadOwnerObject(dd)); peekErr != nil { - r.TryCancelDataDownload(ctx, dd, fmt.Sprintf("found a dataupload %s/%s with expose error: %s. mark it as cancel", dd.Namespace, dd.Name, peekErr)) + r.tryCancelAcceptedDataDownload(ctx, dd, fmt.Sprintf("found a dataupload %s/%s with expose error: %s. mark it as cancel", dd.Namespace, dd.Name, peekErr)) log.Errorf("Cancel dd %s/%s because of expose error %s", dd.Namespace, dd.Name, peekErr) } else if dd.Status.StartTimestamp != nil { if time.Since(dd.Status.StartTimestamp.Time) >= r.preparingTimeout { @@ -280,23 +271,35 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request return r.errorOut(ctx, dd, err, "error to create data path", log) } } + + if err := r.initCancelableDataPath(ctx, asyncBR, result, log); err != nil { + log.WithError(err).Errorf("Failed to init cancelable data path for %s", dd.Name) + + r.closeDataPath(ctx, dd.Name) + return r.errorOut(ctx, dd, err, "error initializing data path", log) + } + // Update status to InProgress original := dd.DeepCopy() dd.Status.Phase = velerov2alpha1api.DataDownloadPhaseInProgress dd.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} if err := r.client.Patch(ctx, dd, client.MergeFrom(original)); err != nil { - log.WithError(err).Error("Unable to update status to in progress") - return ctrl.Result{}, err + log.WithError(err).Warnf("Failed to update datadownload %s to InProgress, will close data path and retry", dd.Name) + + r.closeDataPath(ctx, dd.Name) + return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil } log.Info("Data download is marked as in progress") - reconcileResult, err := r.runCancelableDataPath(ctx, asyncBR, dd, result, log) - if err != nil { - log.Errorf("Failed to run cancelable data path for %s with err %v", dd.Name, err) + if err := r.startCancelableDataPath(asyncBR, dd, result, log); err != nil { + log.WithError(err).Errorf("Failed to start cancelable data path for %s", dd.Name) + r.closeDataPath(ctx, dd.Name) + return r.errorOut(ctx, dd, err, "error starting data path", log) } - return reconcileResult, err + + return ctrl.Result{}, nil } else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseInProgress { log.Info("Data download is in progress") if dd.Spec.Cancel { @@ -339,27 +342,33 @@ func (r *DataDownloadReconciler) Reconcile(ctx context.Context, req ctrl.Request } } -func (r *DataDownloadReconciler) runCancelableDataPath(ctx context.Context, asyncBR datapath.AsyncBR, dd *velerov2alpha1api.DataDownload, res *exposer.ExposeResult, log logrus.FieldLogger) (reconcile.Result, error) { +func (r *DataDownloadReconciler) initCancelableDataPath(ctx context.Context, asyncBR datapath.AsyncBR, res *exposer.ExposeResult, log logrus.FieldLogger) error { + log.Info("Init cancelable dataDownload") + if err := asyncBR.Init(ctx, nil); err != nil { - return r.errorOut(ctx, dd, err, "error to initialize asyncBR", log) + return errors.Wrap(err, "error initializing asyncBR") } log.Infof("async restore init for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName) + return nil +} + +func (r *DataDownloadReconciler) startCancelableDataPath(asyncBR datapath.AsyncBR, dd *velerov2alpha1api.DataDownload, res *exposer.ExposeResult, log logrus.FieldLogger) error { + log.Info("Start cancelable dataDownload") + if err := asyncBR.StartRestore(dd.Spec.SnapshotID, datapath.AccessPoint{ ByPath: res.ByPod.VolumeName, }, dd.Spec.DataMoverConfig); err != nil { - return r.errorOut(ctx, dd, err, fmt.Sprintf("error starting async restore for pod %s, volume %s", res.ByPod.HostingPod, res.ByPod.VolumeName), log) + return errors.Wrapf(err, "error starting async restore for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName) } - log.Infof("Async restore started for pod %s, volume %s", res.ByPod.HostingPod, res.ByPod.VolumeName) - return ctrl.Result{}, nil + log.Infof("Async restore started for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName) + return nil } func (r *DataDownloadReconciler) OnDataDownloadCompleted(ctx context.Context, namespace string, ddName string, result datapath.Result) { - defer func() { - go r.closeDataPath(ctx, ddName) - }() + defer r.dataPathMgr.RemoveAsyncBR(ddName) log := r.logger.WithField("datadownload", ddName) log.Info("Async fs restore data path completed") @@ -392,9 +401,7 @@ func (r *DataDownloadReconciler) OnDataDownloadCompleted(ctx context.Context, na } func (r *DataDownloadReconciler) OnDataDownloadFailed(ctx context.Context, namespace string, ddName string, err error) { - defer func() { - go r.closeDataPath(ctx, ddName) - }() + defer r.dataPathMgr.RemoveAsyncBR(ddName) log := r.logger.WithField("datadownload", ddName) @@ -404,16 +411,12 @@ func (r *DataDownloadReconciler) OnDataDownloadFailed(ctx context.Context, names if getErr := r.client.Get(ctx, types.NamespacedName{Name: ddName, Namespace: namespace}, &dd); getErr != nil { log.WithError(getErr).Warn("Failed to get data download on failure") } else { - if _, errOut := r.errorOut(ctx, &dd, err, "data path restore failed", log); err != nil { - log.WithError(err).Warnf("Failed to patch data download with err %v", errOut) - } + _, _ = r.errorOut(ctx, &dd, err, "data path restore failed", log) } } func (r *DataDownloadReconciler) OnDataDownloadCancelled(ctx context.Context, namespace string, ddName string) { - defer func() { - go r.closeDataPath(ctx, ddName) - }() + defer r.dataPathMgr.RemoveAsyncBR(ddName) log := r.logger.WithField("datadownload", ddName) @@ -440,9 +443,9 @@ func (r *DataDownloadReconciler) OnDataDownloadCancelled(ctx context.Context, na } } -func (r *DataDownloadReconciler) TryCancelDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload, message string) { +func (r *DataDownloadReconciler) tryCancelAcceptedDataDownload(ctx context.Context, dd *velerov2alpha1api.DataDownload, message string) { log := r.logger.WithField("datadownload", dd.Name) - log.Warn("Async fs backup data path canceled") + log.Warn("Accepted data download is canceled") succeeded, err := r.exclusiveUpdateDataDownload(ctx, dd, func(dataDownload *velerov2alpha1api.DataDownload) { dataDownload.Status.Phase = velerov2alpha1api.DataDownloadPhaseCanceled @@ -450,7 +453,10 @@ func (r *DataDownloadReconciler) TryCancelDataDownload(ctx context.Context, dd * dataDownload.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} } dataDownload.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()} - dataDownload.Status.Message = message + + if message != "" { + dataDownload.Status.Message = message + } }) if err != nil { @@ -464,7 +470,6 @@ func (r *DataDownloadReconciler) TryCancelDataDownload(ctx context.Context, dd * // success update r.metrics.RegisterDataDownloadCancel(r.nodeName) r.restoreExposer.CleanUp(ctx, getDataDownloadOwnerObject(dd)) - r.closeDataPath(ctx, dd.Name) } func (r *DataDownloadReconciler) OnDataDownloadProgress(ctx context.Context, namespace string, ddName string, progress *uploader.Progress) { @@ -762,9 +767,9 @@ func UpdateDataDownloadWithRetry(ctx context.Context, client client.Client, name var funcResumeCancellableDataRestore = (*DataDownloadReconciler).resumeCancellableDataPath -func (r *DataDownloadReconciler) AttemptDataDownloadResume(ctx context.Context, cli client.Client, logger *logrus.Entry, ns string) error { +func (r *DataDownloadReconciler) AttemptDataDownloadResume(ctx context.Context, logger *logrus.Entry, ns string) error { dataDownloads := &velerov2alpha1api.DataDownloadList{} - if err := cli.List(ctx, dataDownloads, &client.ListOptions{Namespace: ns}); err != nil { + if err := r.client.List(ctx, dataDownloads, &client.ListOptions{Namespace: ns}); err != nil { r.logger.WithError(errors.WithStack(err)).Error("failed to list datadownloads") return errors.Wrapf(err, "error to list datadownloads") } @@ -783,13 +788,14 @@ func (r *DataDownloadReconciler) AttemptDataDownloadResume(ctx context.Context, err := funcResumeCancellableDataRestore(r, ctx, dd, logger) if err == nil { + logger.WithField("dd", dd.Name).WithField("current node", r.nodeName).Info("Completed to resume in progress DD") continue } logger.WithField("datadownload", dd.GetName()).WithError(err).Warn("Failed to resume data path for dd, have to cancel it") resumeErr := err - err = UpdateDataDownloadWithRetry(ctx, cli, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, logger.WithField("datadownload", dd.Name), + err = UpdateDataDownloadWithRetry(ctx, r.client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, logger.WithField("datadownload", dd.Name), func(dataDownload *velerov2alpha1api.DataDownload) bool { if dataDownload.Spec.Cancel { return false @@ -806,7 +812,7 @@ func (r *DataDownloadReconciler) AttemptDataDownloadResume(ctx context.Context, } else if dd.Status.Phase == velerov2alpha1api.DataDownloadPhaseAccepted { r.logger.WithField("datadownload", dd.GetName()).Warn("Cancel dd under Accepted phase") - err := UpdateDataDownloadWithRetry(ctx, cli, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, + err := UpdateDataDownloadWithRetry(ctx, r.client, types.NamespacedName{Namespace: dd.Namespace, Name: dd.Name}, r.logger.WithField("datadownload", dd.Name), func(dataDownload *velerov2alpha1api.DataDownload) bool { if dataDownload.Spec.Cancel { return false diff --git a/pkg/controller/data_download_controller_test.go b/pkg/controller/data_download_controller_test.go index 7e8a1f6918..a54135d034 100644 --- a/pkg/controller/data_download_controller_test.go +++ b/pkg/controller/data_download_controller_test.go @@ -43,7 +43,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client/fake" - "github.com/vmware-tanzu/velero/internal/credentials" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1" "github.com/vmware-tanzu/velero/pkg/builder" @@ -139,19 +138,9 @@ func initDataDownloadReconcilerWithError(objects []runtime.Object, needError ... return nil, err } - credentialFileStore, err := credentials.NewNamespacedFileStore( - fakeClient, - velerov1api.DefaultNamespace, - "/tmp/credentials", - fakeFS, - ) - if err != nil { - return nil, err - } - dataPathMgr := datapath.NewManager(1) - return NewDataDownloadReconciler(fakeClient, nil, fakeKubeClient, dataPathMgr, nil, &credentials.CredentialGetter{FromFile: credentialFileStore}, "test-node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil + return NewDataDownloadReconciler(fakeClient, nil, fakeKubeClient, dataPathMgr, "test-node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil } func TestDataDownloadReconcile(t *testing.T) { @@ -192,6 +181,10 @@ func TestDataDownloadReconcile(t *testing.T) { isFSBRRestoreErr bool notNilExpose bool notMockCleanUp bool + mockInit bool + mockInitErr error + mockStart bool + mockStartErr error mockCancel bool mockClose bool expected *velerov2alpha1api.DataDownload @@ -264,13 +257,36 @@ func TestDataDownloadReconcile(t *testing.T) { expectedResult: &ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, }, { - name: "Unable to update status to in progress for data download", + name: "data path init error", dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(), targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), - needErrs: []bool{false, false, false, true}, + mockInit: true, + mockInitErr: errors.New("fake-data-path-init-error"), + mockClose: true, notNilExpose: true, - notMockCleanUp: true, - expectedStatusMsg: "Patch error", + expectedStatusMsg: "error initializing asyncBR: fake-data-path-init-error", + }, + { + name: "Unable to update status to in progress for data download", + dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(), + targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), + needErrs: []bool{false, false, false, true}, + mockInit: true, + mockClose: true, + notNilExpose: true, + notMockCleanUp: true, + expectedResult: &ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, + }, + { + name: "data path start error", + dd: dataDownloadBuilder().Phase(velerov2alpha1api.DataDownloadPhasePrepared).Result(), + targetPVC: builder.ForPersistentVolumeClaim("test-ns", "test-pvc").Result(), + mockInit: true, + mockStart: true, + mockStartErr: errors.New("fake-data-path-start-error"), + mockClose: true, + notNilExpose: true, + expectedStatusMsg: "error starting async restore for pod test-name, volume test-pvc: fake-data-path-start-error", }, { name: "accept DataDownload error", @@ -399,6 +415,14 @@ func TestDataDownloadReconcile(t *testing.T) { datapath.MicroServiceBRWatcherCreator = func(kbclient.Client, kubernetes.Interface, manager.Manager, string, string, string, string, string, string, datapath.Callbacks, logrus.FieldLogger) datapath.AsyncBR { asyncBR := datapathmockes.NewAsyncBR(t) + if test.mockInit { + asyncBR.On("Init", mock.Anything, mock.Anything).Return(test.mockInitErr) + } + + if test.mockStart { + asyncBR.On("StartRestore", mock.Anything, mock.Anything, mock.Anything).Return(test.mockStartErr) + } + if test.mockCancel { asyncBR.On("Cancel").Return() } @@ -488,6 +512,10 @@ func TestDataDownloadReconcile(t *testing.T) { assert.True(t, true, apierrors.IsNotFound(err)) } + if !test.needCreateFSBR { + assert.Nil(t, r.dataPathMgr.GetAsyncBR(test.dd.Name)) + } + t.Logf("%s: \n %v \n", test.name, dd) }) } @@ -845,7 +873,7 @@ func TestTryCancelDataDownload(t *testing.T) { err = r.client.Create(ctx, test.dd) require.NoError(t, err) - r.TryCancelDataDownload(ctx, test.dd, "") + r.tryCancelAcceptedDataDownload(ctx, test.dd, "") if test.expectedErr == "" { assert.NoError(t, err) @@ -1051,7 +1079,7 @@ func TestAttemptDataDownloadResume(t *testing.T) { funcResumeCancellableDataBackup = dt.resumeCancellableDataPath // Run the test - err = r.AttemptDataDownloadResume(ctx, r.client, r.logger.WithField("name", test.name), test.dd.Namespace) + err = r.AttemptDataDownloadResume(ctx, r.logger.WithField("name", test.name), test.dd.Namespace) if test.expectedError != "" { assert.EqualError(t, err, test.expectedError) diff --git a/pkg/controller/data_upload_controller.go b/pkg/controller/data_upload_controller.go index 520e487957..22dc59bd53 100644 --- a/pkg/controller/data_upload_controller.go +++ b/pkg/controller/data_upload_controller.go @@ -41,7 +41,6 @@ import ( snapshotter "github.com/kubernetes-csi/external-snapshotter/client/v7/clientset/versioned/typed/volumesnapshot/v1" - "github.com/vmware-tanzu/velero/internal/credentials" "github.com/vmware-tanzu/velero/pkg/apis/velero/shared" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1" @@ -50,9 +49,7 @@ import ( "github.com/vmware-tanzu/velero/pkg/exposer" "github.com/vmware-tanzu/velero/pkg/metrics" "github.com/vmware-tanzu/velero/pkg/nodeagent" - "github.com/vmware-tanzu/velero/pkg/repository" "github.com/vmware-tanzu/velero/pkg/uploader" - "github.com/vmware-tanzu/velero/pkg/util/filesystem" "github.com/vmware-tanzu/velero/pkg/util/kube" ) @@ -69,11 +66,8 @@ type DataUploadReconciler struct { kubeClient kubernetes.Interface csiSnapshotClient snapshotter.SnapshotV1Interface mgr manager.Manager - repoEnsurer *repository.Ensurer Clock clocks.WithTickerAndDelayedExecution - credentialGetter *credentials.CredentialGetter nodeName string - fileSystem filesystem.Interface logger logrus.FieldLogger snapshotExposerList map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer dataPathMgr *datapath.Manager @@ -84,19 +78,16 @@ type DataUploadReconciler struct { } func NewDataUploadReconciler(client client.Client, mgr manager.Manager, kubeClient kubernetes.Interface, csiSnapshotClient snapshotter.SnapshotV1Interface, - dataPathMgr *datapath.Manager, loadAffinity *nodeagent.LoadAffinity, backupPVCConfig map[string]nodeagent.BackupPVC, repoEnsurer *repository.Ensurer, clock clocks.WithTickerAndDelayedExecution, - cred *credentials.CredentialGetter, nodeName string, fs filesystem.Interface, preparingTimeout time.Duration, log logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataUploadReconciler { + dataPathMgr *datapath.Manager, loadAffinity *nodeagent.LoadAffinity, backupPVCConfig map[string]nodeagent.BackupPVC, clock clocks.WithTickerAndDelayedExecution, + nodeName string, preparingTimeout time.Duration, log logrus.FieldLogger, metrics *metrics.ServerMetrics) *DataUploadReconciler { return &DataUploadReconciler{ client: client, mgr: mgr, kubeClient: kubeClient, csiSnapshotClient: csiSnapshotClient, Clock: clock, - credentialGetter: cred, nodeName: nodeName, - fileSystem: fs, logger: log, - repoEnsurer: repoEnsurer, snapshotExposerList: map[velerov2alpha1api.SnapshotType]exposer.SnapshotExposer{velerov2alpha1api.SnapshotTypeCSI: exposer.NewCSISnapshotExposer(kubeClient, csiSnapshotClient, log)}, dataPathMgr: dataPathMgr, loadAffinity: loadAffinity, @@ -240,9 +231,9 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) // we don't want to update CR into cancel status forcely as it may conflict with CR update in Expose action // we could retry when the CR requeue in periodcally log.Debugf("Data upload is been canceled %s in Phase %s", du.GetName(), du.Status.Phase) - r.TryCancelDataUpload(ctx, du, "") + r.tryCancelAcceptedDataUpload(ctx, du, "") } else if peekErr := ep.PeekExposed(ctx, getOwnerObject(du)); peekErr != nil { - r.TryCancelDataUpload(ctx, du, fmt.Sprintf("found a dataupload %s/%s with expose error: %s. mark it as cancel", du.Namespace, du.Name, peekErr)) + r.tryCancelAcceptedDataUpload(ctx, du, fmt.Sprintf("found a dataupload %s/%s with expose error: %s. mark it as cancel", du.Namespace, du.Name, peekErr)) log.Errorf("Cancel du %s/%s because of expose error %s", du.Namespace, du.Name, peekErr) } else if du.Status.StartTimestamp != nil { if time.Since(du.Status.StartTimestamp.Time) >= r.preparingTimeout { @@ -293,21 +284,35 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) return r.errorOut(ctx, du, err, "error to create data path", log) } } + + if err := r.initCancelableDataPath(ctx, asyncBR, res, log); err != nil { + log.WithError(err).Errorf("Failed to init cancelable data path for %s", du.Name) + + r.closeDataPath(ctx, du.Name) + return r.errorOut(ctx, du, err, "error initializing data path", log) + } + // Update status to InProgress original := du.DeepCopy() du.Status.Phase = velerov2alpha1api.DataUploadPhaseInProgress du.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} if err := r.client.Patch(ctx, du, client.MergeFrom(original)); err != nil { - return r.errorOut(ctx, du, err, "error updating dataupload status", log) + log.WithError(err).Warnf("Failed to update dataupload %s to InProgress, will data path close and retry", du.Name) + + r.closeDataPath(ctx, du.Name) + return ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, nil } log.Info("Data upload is marked as in progress") - result, err := r.runCancelableDataUpload(ctx, asyncBR, du, res, log) - if err != nil { - log.Errorf("Failed to run cancelable data path for %s with err %v", du.Name, err) + + if err := r.startCancelableDataPath(asyncBR, du, res, log); err != nil { + log.WithError(err).Errorf("Failed to start cancelable data path for %s", du.Name) r.closeDataPath(ctx, du.Name) + + return r.errorOut(ctx, du, err, "error starting data path", log) } - return result, err + + return ctrl.Result{}, nil } else if du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress { log.Info("Data upload is in progress") if du.Spec.Cancel { @@ -350,29 +355,33 @@ func (r *DataUploadReconciler) Reconcile(ctx context.Context, req ctrl.Request) } } -func (r *DataUploadReconciler) runCancelableDataUpload(ctx context.Context, asyncBR datapath.AsyncBR, du *velerov2alpha1api.DataUpload, res *exposer.ExposeResult, log logrus.FieldLogger) (reconcile.Result, error) { - log.Info("Run cancelable dataUpload") +func (r *DataUploadReconciler) initCancelableDataPath(ctx context.Context, asyncBR datapath.AsyncBR, res *exposer.ExposeResult, log logrus.FieldLogger) error { + log.Info("Init cancelable dataUpload") if err := asyncBR.Init(ctx, nil); err != nil { - return r.errorOut(ctx, du, err, "error to initialize asyncBR", log) + return errors.Wrap(err, "error initializing asyncBR") } log.Infof("async backup init for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName) + return nil +} + +func (r *DataUploadReconciler) startCancelableDataPath(asyncBR datapath.AsyncBR, du *velerov2alpha1api.DataUpload, res *exposer.ExposeResult, log logrus.FieldLogger) error { + log.Info("Start cancelable dataUpload") + if err := asyncBR.StartBackup(datapath.AccessPoint{ ByPath: res.ByPod.VolumeName, }, du.Spec.DataMoverConfig, nil); err != nil { - return r.errorOut(ctx, du, err, fmt.Sprintf("error starting async backup for pod %s, volume %s", res.ByPod.HostingPod, res.ByPod.VolumeName), log) + return errors.Wrapf(err, "error starting async backup for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName) } - log.Infof("Async backup started for pod %s, volume %s", res.ByPod.HostingPod, res.ByPod.VolumeName) - return ctrl.Result{}, nil + log.Infof("Async backup started for pod %s, volume %s", res.ByPod.HostingPod.Name, res.ByPod.VolumeName) + return nil } func (r *DataUploadReconciler) OnDataUploadCompleted(ctx context.Context, namespace string, duName string, result datapath.Result) { - defer func() { - go r.closeDataPath(ctx, duName) - }() + defer r.dataPathMgr.RemoveAsyncBR(duName) log := r.logger.WithField("dataupload", duName) @@ -416,9 +425,7 @@ func (r *DataUploadReconciler) OnDataUploadCompleted(ctx context.Context, namesp } func (r *DataUploadReconciler) OnDataUploadFailed(ctx context.Context, namespace, duName string, err error) { - defer func() { - go r.closeDataPath(ctx, duName) - }() + defer r.dataPathMgr.RemoveAsyncBR(duName) log := r.logger.WithField("dataupload", duName) @@ -428,16 +435,12 @@ func (r *DataUploadReconciler) OnDataUploadFailed(ctx context.Context, namespace if getErr := r.client.Get(ctx, types.NamespacedName{Name: duName, Namespace: namespace}, &du); getErr != nil { log.WithError(getErr).Warn("Failed to get dataupload on failure") } else { - if _, errOut := r.errorOut(ctx, &du, err, "data path backup failed", log); err != nil { - log.WithError(err).Warnf("Failed to patch dataupload with err %v", errOut) - } + _, _ = r.errorOut(ctx, &du, err, "data path backup failed", log) } } func (r *DataUploadReconciler) OnDataUploadCancelled(ctx context.Context, namespace string, duName string) { - defer func() { - go r.closeDataPath(ctx, duName) - }() + defer r.dataPathMgr.RemoveAsyncBR(duName) log := r.logger.WithField("dataupload", duName) @@ -463,17 +466,19 @@ func (r *DataUploadReconciler) OnDataUploadCancelled(ctx context.Context, namesp } } -// TryCancelDataUpload clear up resources only when update success -func (r *DataUploadReconciler) TryCancelDataUpload(ctx context.Context, du *velerov2alpha1api.DataUpload, message string) { +func (r *DataUploadReconciler) tryCancelAcceptedDataUpload(ctx context.Context, du *velerov2alpha1api.DataUpload, message string) { log := r.logger.WithField("dataupload", du.Name) - log.Warn("Async fs backup data path canceled") + log.Warn("Accepted data upload is canceled") succeeded, err := r.exclusiveUpdateDataUpload(ctx, du, func(dataUpload *velerov2alpha1api.DataUpload) { dataUpload.Status.Phase = velerov2alpha1api.DataUploadPhaseCanceled if dataUpload.Status.StartTimestamp.IsZero() { dataUpload.Status.StartTimestamp = &metav1.Time{Time: r.Clock.Now()} } dataUpload.Status.CompletionTimestamp = &metav1.Time{Time: r.Clock.Now()} - dataUpload.Status.Message = message + + if message != "" { + dataUpload.Status.Message = message + } }) if err != nil { @@ -488,10 +493,9 @@ func (r *DataUploadReconciler) TryCancelDataUpload(ctx context.Context, du *vele r.metrics.RegisterDataUploadCancel(r.nodeName) // cleans up any objects generated during the snapshot expose r.cleanUp(ctx, du, log) - r.closeDataPath(ctx, du.Name) } -func (r *DataUploadReconciler) cleanUp(ctx context.Context, du *velerov2alpha1api.DataUpload, log *logrus.Entry) { +func (r *DataUploadReconciler) cleanUp(ctx context.Context, du *velerov2alpha1api.DataUpload, log logrus.FieldLogger) { ep, ok := r.snapshotExposerList[du.Spec.SnapshotType] if !ok { log.WithError(fmt.Errorf("%v type of snapshot exposer is not exist", du.Spec.SnapshotType)). @@ -639,7 +643,7 @@ func (r *DataUploadReconciler) errorOut(ctx context.Context, du *velerov2alpha1a } se.CleanUp(ctx, getOwnerObject(du), volumeSnapshotName, du.Spec.SourceNamespace) } else { - err = errors.Wrapf(err, "failed to clean up exposed snapshot with could not find %s snapshot exposer", du.Spec.SnapshotType) + log.Errorf("failed to clean up exposed snapshot could not find %s snapshot exposer", du.Spec.SnapshotType) } return ctrl.Result{}, r.updateStatusToFailed(ctx, du, err, msg, log) @@ -863,9 +867,9 @@ func UpdateDataUploadWithRetry(ctx context.Context, client client.Client, namesp var funcResumeCancellableDataBackup = (*DataUploadReconciler).resumeCancellableDataPath -func (r *DataUploadReconciler) AttemptDataUploadResume(ctx context.Context, cli client.Client, logger *logrus.Entry, ns string) error { +func (r *DataUploadReconciler) AttemptDataUploadResume(ctx context.Context, logger *logrus.Entry, ns string) error { dataUploads := &velerov2alpha1api.DataUploadList{} - if err := cli.List(ctx, dataUploads, &client.ListOptions{Namespace: ns}); err != nil { + if err := r.client.List(ctx, dataUploads, &client.ListOptions{Namespace: ns}); err != nil { r.logger.WithError(errors.WithStack(err)).Error("failed to list datauploads") return errors.Wrapf(err, "error to list datauploads") } @@ -884,13 +888,14 @@ func (r *DataUploadReconciler) AttemptDataUploadResume(ctx context.Context, cli err := funcResumeCancellableDataBackup(r, ctx, du, logger) if err == nil { + logger.WithField("du", du.Name).WithField("current node", r.nodeName).Info("Completed to resume in progress DU") continue } logger.WithField("dataupload", du.GetName()).WithError(err).Warn("Failed to resume data path for du, have to cancel it") resumeErr := err - err = UpdateDataUploadWithRetry(ctx, cli, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, logger.WithField("dataupload", du.Name), + err = UpdateDataUploadWithRetry(ctx, r.client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, logger.WithField("dataupload", du.Name), func(dataUpload *velerov2alpha1api.DataUpload) bool { if dataUpload.Spec.Cancel { return false @@ -907,7 +912,7 @@ func (r *DataUploadReconciler) AttemptDataUploadResume(ctx context.Context, cli } else if du.Status.Phase == velerov2alpha1api.DataUploadPhaseAccepted { r.logger.WithField("dataupload", du.GetName()).Warn("Cancel du under Accepted phase") - err := UpdateDataUploadWithRetry(ctx, cli, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, r.logger.WithField("dataupload", du.Name), + err := UpdateDataUploadWithRetry(ctx, r.client, types.NamespacedName{Namespace: du.Namespace, Name: du.Name}, r.logger.WithField("dataupload", du.Name), func(dataUpload *velerov2alpha1api.DataUpload) bool { if dataUpload.Spec.Cancel { return false diff --git a/pkg/controller/data_upload_controller_test.go b/pkg/controller/data_upload_controller_test.go index 2a8d55010a..a6ee25574d 100644 --- a/pkg/controller/data_upload_controller_test.go +++ b/pkg/controller/data_upload_controller_test.go @@ -49,7 +49,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" - "github.com/vmware-tanzu/velero/internal/credentials" velerov1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" velerov2alpha1api "github.com/vmware-tanzu/velero/pkg/apis/velero/v2alpha1" "github.com/vmware-tanzu/velero/pkg/builder" @@ -231,24 +230,9 @@ func initDataUploaderReconcilerWithError(needError ...error) (*DataUploadReconci fakeSnapshotClient := snapshotFake.NewSimpleClientset(vsObject, vscObj) fakeKubeClient := clientgofake.NewSimpleClientset(daemonSet) - fakeFS := velerotest.NewFakeFileSystem() - pathGlob := fmt.Sprintf("/host_pods/%s/volumes/*/%s", "", dataUploadName) - _, err = fakeFS.Create(pathGlob) - if err != nil { - return nil, err - } - credentialFileStore, err := credentials.NewNamespacedFileStore( - fakeClient, - velerov1api.DefaultNamespace, - "/tmp/credentials", - fakeFS, - ) - if err != nil { - return nil, err - } - return NewDataUploadReconciler(fakeClient, nil, fakeKubeClient, fakeSnapshotClient.SnapshotV1(), dataPathMgr, nil, map[string]nodeagent.BackupPVC{}, nil, - testclocks.NewFakeClock(now), &credentials.CredentialGetter{FromFile: credentialFileStore}, "test-node", fakeFS, time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil + return NewDataUploadReconciler(fakeClient, nil, fakeKubeClient, fakeSnapshotClient.SnapshotV1(), dataPathMgr, nil, map[string]nodeagent.BackupPVC{}, + testclocks.NewFakeClock(now), "test-node", time.Minute*5, velerotest.NewLogger(), metrics.NewServerMetrics()), nil } func dataUploadBuilder() *builder.DataUploadBuilder { @@ -310,20 +294,16 @@ type fakeDataUploadFSBR struct { du *velerov2alpha1api.DataUpload kubeClient kbclient.Client clock clock.WithTickerAndDelayedExecution + initErr error + startErr error } func (f *fakeDataUploadFSBR) Init(ctx context.Context, param interface{}) error { - return nil + return f.initErr } func (f *fakeDataUploadFSBR) StartBackup(source datapath.AccessPoint, uploaderConfigs map[string]string, param interface{}) error { - du := f.du - original := f.du.DeepCopy() - du.Status.Phase = velerov2alpha1api.DataUploadPhaseCompleted - du.Status.CompletionTimestamp = &metav1.Time{Time: f.clock.Now()} - f.kubeClient.Patch(context.Background(), du, kbclient.MergeFrom(original)) - - return nil + return f.startErr } func (f *fakeDataUploadFSBR) StartRestore(snapshotID string, target datapath.AccessPoint, uploaderConfigs map[string]string) error { @@ -352,27 +332,24 @@ func TestReconcile(t *testing.T) { needErrs []bool peekErr error notCreateFSBR bool + fsBRInitErr error + fsBRStartErr error }{ { - name: "Dataupload is not initialized", - du: builder.ForDataUpload("unknown-ns", "unknown-name").Result(), - expectedProcessed: false, - expected: nil, - expectedRequeue: ctrl.Result{}, + name: "Dataupload is not initialized", + du: builder.ForDataUpload("unknown-ns", "unknown-name").Result(), + expectedRequeue: ctrl.Result{}, }, { - name: "Error get Dataupload", - du: builder.ForDataUpload(velerov1api.DefaultNamespace, "unknown-name").Result(), - expectedProcessed: false, - expected: nil, - expectedRequeue: ctrl.Result{}, - expectedErrMsg: "getting DataUpload: Get error", - needErrs: []bool{true, false, false, false}, + name: "Error get Dataupload", + du: builder.ForDataUpload(velerov1api.DefaultNamespace, "unknown-name").Result(), + expectedRequeue: ctrl.Result{}, + expectedErrMsg: "getting DataUpload: Get error", + needErrs: []bool{true, false, false, false}, }, { - name: "Unsupported data mover type", - du: dataUploadBuilder().DataMover("unknown type").Result(), - expectedProcessed: false, - expected: dataUploadBuilder().Phase("").Result(), - expectedRequeue: ctrl.Result{}, + name: "Unsupported data mover type", + du: dataUploadBuilder().DataMover("unknown type").Result(), + expected: dataUploadBuilder().Phase("").Result(), + expectedRequeue: ctrl.Result{}, }, { name: "Unknown type of snapshot exposer is not initialized", du: dataUploadBuilder().SnapshotType("unknown type").Result(), @@ -381,13 +358,12 @@ func TestReconcile(t *testing.T) { expectedRequeue: ctrl.Result{}, expectedErrMsg: "unknown type type of snapshot exposer is not exist", }, { - name: "Dataupload should be accepted", - du: dataUploadBuilder().Result(), - pod: builder.ForPod("fake-ns", dataUploadName).Volumes(&corev1.Volume{Name: "test-pvc"}).Result(), - pvc: builder.ForPersistentVolumeClaim("fake-ns", "test-pvc").Result(), - expectedProcessed: false, - expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(), - expectedRequeue: ctrl.Result{}, + name: "Dataupload should be accepted", + du: dataUploadBuilder().Result(), + pod: builder.ForPod("fake-ns", dataUploadName).Volumes(&corev1.Volume{Name: "test-pvc"}).Result(), + pvc: builder.ForPersistentVolumeClaim("fake-ns", "test-pvc").Result(), + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseAccepted).Result(), + expectedRequeue: ctrl.Result{}, }, { name: "Dataupload should fail to get PVC information", @@ -399,34 +375,31 @@ func TestReconcile(t *testing.T) { expectedErrMsg: "failed to get PVC", }, { - name: "Dataupload should be prepared", - du: dataUploadBuilder().SnapshotType(fakeSnapshotType).Result(), - expectedProcessed: false, - expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(), - expectedRequeue: ctrl.Result{}, - }, { - name: "Dataupload prepared should be completed", - pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), - du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(), - expectedProcessed: true, - expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseCompleted).Result(), - expectedRequeue: ctrl.Result{}, + name: "Dataupload should be prepared", + du: dataUploadBuilder().SnapshotType(fakeSnapshotType).Result(), + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(), + expectedRequeue: ctrl.Result{}, }, { - name: "Dataupload with not enabled cancel", - pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), - du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).SnapshotType(fakeSnapshotType).Cancel(false).Result(), - expectedProcessed: false, - expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(), - expectedRequeue: ctrl.Result{}, + name: "Dataupload prepared should be completed", + pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(), + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(), + expectedRequeue: ctrl.Result{}, }, { - name: "Dataupload should be cancel", - pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), - du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).SnapshotType(fakeSnapshotType).Cancel(true).Result(), - expectedProcessed: false, - expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseCanceling).Result(), - expectedRequeue: ctrl.Result{}, + name: "Dataupload with not enabled cancel", + pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).SnapshotType(fakeSnapshotType).Cancel(false).Result(), + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(), + expectedRequeue: ctrl.Result{}, + }, + { + name: "Dataupload should be cancel", + pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).SnapshotType(fakeSnapshotType).Cancel(true).Result(), + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseCanceling).Result(), + expectedRequeue: ctrl.Result{}, }, { name: "Dataupload should be cancel with match node", @@ -449,19 +422,43 @@ func TestReconcile(t *testing.T) { du.Status.Node = "different_node" return du }(), - expectedProcessed: false, - expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(), - expectedRequeue: ctrl.Result{}, - notCreateFSBR: true, + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseInProgress).Result(), + expectedRequeue: ctrl.Result{}, + notCreateFSBR: true, }, { - name: "runCancelableDataUpload is concurrent limited", - dataMgr: datapath.NewManager(0), + name: "runCancelableDataUpload is concurrent limited", + dataMgr: datapath.NewManager(0), + pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(), + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(), + expectedRequeue: ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, + }, + { + name: "data path init error", + pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(), + fsBRInitErr: errors.New("fake-data-path-init-error"), + expectedProcessed: true, + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).SnapshotType(fakeSnapshotType).Result(), + expectedErrMsg: "error initializing asyncBR: fake-data-path-init-error", + }, + { + name: "Unable to update status to in progress for data download", + pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), + du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(), + needErrs: []bool{false, false, false, true}, + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(), + expectedRequeue: ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, + }, + { + name: "data path start error", pod: builder.ForPod(velerov1api.DefaultNamespace, dataUploadName).Volumes(&corev1.Volume{Name: "dataupload-1"}).Result(), du: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).SnapshotType(fakeSnapshotType).Result(), - expectedProcessed: false, - expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhasePrepared).Result(), - expectedRequeue: ctrl.Result{Requeue: true, RequeueAfter: time.Second * 5}, + fsBRStartErr: errors.New("fake-data-path-start-error"), + expectedProcessed: true, + expected: dataUploadBuilder().Phase(velerov2alpha1api.DataUploadPhaseFailed).SnapshotType(fakeSnapshotType).Result(), + expectedErrMsg: "error starting async backup for pod dataupload-1, volume dataupload-1: fake-data-path-start-error", }, { name: "prepare timeout", @@ -484,7 +481,6 @@ func TestReconcile(t *testing.T) { du.DeletionTimestamp = &metav1.Time{Time: time.Now()} return du }(), - expectedProcessed: false, checkFunc: func(du velerov2alpha1api.DataUpload) bool { return du.Spec.Cancel }, @@ -500,7 +496,6 @@ func TestReconcile(t *testing.T) { du.DeletionTimestamp = &metav1.Time{Time: time.Now()} return du }(), - expectedProcessed: false, checkFunc: func(du velerov2alpha1api.DataUpload) bool { return !controllerutil.ContainsFinalizer(&du, DataUploadDownloadFinalizer) }, @@ -559,12 +554,16 @@ func TestReconcile(t *testing.T) { du: test.du, kubeClient: r.client, clock: r.Clock, + initErr: test.fsBRInitErr, + startErr: test.fsBRStartErr, } } } + testCreateFsBR := false if test.du.Status.Phase == velerov2alpha1api.DataUploadPhaseInProgress && !test.notCreateFSBR { if fsBR := r.dataPathMgr.GetAsyncBR(test.du.Name); fsBR == nil { + testCreateFsBR = true _, err := r.dataPathMgr.CreateMicroServiceBRWatcher(ctx, r.client, nil, nil, datapath.TaskTypeBackup, test.du.Name, velerov1api.DefaultNamespace, "", "", "", datapath.Callbacks{OnCancelled: r.OnDataUploadCancelled}, false, velerotest.NewLogger()) require.NoError(t, err) } @@ -609,6 +608,10 @@ func TestReconcile(t *testing.T) { if test.checkFunc != nil { assert.True(t, test.checkFunc(du)) } + + if !testCreateFsBR && du.Status.Phase != velerov2alpha1api.DataUploadPhaseInProgress { + assert.Nil(t, r.dataPathMgr.GetAsyncBR(test.du.Name)) + } }) } } @@ -930,7 +933,7 @@ func TestTryCancelDataUpload(t *testing.T) { err = r.client.Create(ctx, test.dd) require.NoError(t, err) - r.TryCancelDataUpload(ctx, test.dd, "") + r.tryCancelAcceptedDataUpload(ctx, test.dd, "") if test.expectedErr == "" { assert.NoError(t, err) @@ -1124,7 +1127,7 @@ func TestAttemptDataUploadResume(t *testing.T) { funcResumeCancellableDataBackup = dt.resumeCancellableDataPath // Run the test - err = r.AttemptDataUploadResume(ctx, r.client, r.logger.WithField("name", test.name), test.du.Namespace) + err = r.AttemptDataUploadResume(ctx, r.logger.WithField("name", test.name), test.du.Namespace) if test.expectedError != "" { assert.EqualError(t, err, test.expectedError) diff --git a/pkg/controller/pod_volume_backup_controller.go b/pkg/controller/pod_volume_backup_controller.go index e626df2b35..548ab0d0c6 100644 --- a/pkg/controller/pod_volume_backup_controller.go +++ b/pkg/controller/pod_volume_backup_controller.go @@ -202,7 +202,7 @@ func (r *PodVolumeBackupReconciler) Reconcile(ctx context.Context, req ctrl.Requ } func (r *PodVolumeBackupReconciler) OnDataPathCompleted(ctx context.Context, namespace string, pvbName string, result datapath.Result) { - defer r.closeDataPath(ctx, pvbName) + defer r.dataPathMgr.RemoveAsyncBR(pvbName) log := r.logger.WithField("pvb", pvbName) @@ -240,7 +240,7 @@ func (r *PodVolumeBackupReconciler) OnDataPathCompleted(ctx context.Context, nam } func (r *PodVolumeBackupReconciler) OnDataPathFailed(ctx context.Context, namespace, pvbName string, err error) { - defer r.closeDataPath(ctx, pvbName) + defer r.dataPathMgr.RemoveAsyncBR(pvbName) log := r.logger.WithField("pvb", pvbName) @@ -255,7 +255,7 @@ func (r *PodVolumeBackupReconciler) OnDataPathFailed(ctx context.Context, namesp } func (r *PodVolumeBackupReconciler) OnDataPathCancelled(ctx context.Context, namespace string, pvbName string) { - defer r.closeDataPath(ctx, pvbName) + defer r.dataPathMgr.RemoveAsyncBR(pvbName) log := r.logger.WithField("pvb", pvbName) diff --git a/pkg/controller/pod_volume_restore_controller.go b/pkg/controller/pod_volume_restore_controller.go index 3f285789db..c4a3e7451f 100644 --- a/pkg/controller/pod_volume_restore_controller.go +++ b/pkg/controller/pod_volume_restore_controller.go @@ -265,7 +265,7 @@ func getInitContainerIndex(pod *corev1api.Pod) int { } func (c *PodVolumeRestoreReconciler) OnDataPathCompleted(ctx context.Context, namespace string, pvrName string, result datapath.Result) { - defer c.closeDataPath(ctx, pvrName) + defer c.dataPathMgr.RemoveAsyncBR(pvrName) log := c.logger.WithField("pvr", pvrName) @@ -325,7 +325,7 @@ func (c *PodVolumeRestoreReconciler) OnDataPathCompleted(ctx context.Context, na } func (c *PodVolumeRestoreReconciler) OnDataPathFailed(ctx context.Context, namespace string, pvrName string, err error) { - defer c.closeDataPath(ctx, pvrName) + defer c.dataPathMgr.RemoveAsyncBR(pvrName) log := c.logger.WithField("pvr", pvrName) @@ -340,7 +340,7 @@ func (c *PodVolumeRestoreReconciler) OnDataPathFailed(ctx context.Context, names } func (c *PodVolumeRestoreReconciler) OnDataPathCancelled(ctx context.Context, namespace string, pvrName string) { - defer c.closeDataPath(ctx, pvrName) + defer c.dataPathMgr.RemoveAsyncBR(pvrName) log := c.logger.WithField("pvr", pvrName) diff --git a/pkg/datamover/backup_micro_service.go b/pkg/datamover/backup_micro_service.go index cedacc0ce4..fa48b3523e 100644 --- a/pkg/datamover/backup_micro_service.go +++ b/pkg/datamover/backup_micro_service.go @@ -127,15 +127,13 @@ func (r *BackupMicroService) Init() error { return err } -var waitControllerTimeout time.Duration = time.Minute * 2 - func (r *BackupMicroService) RunCancelableDataPath(ctx context.Context) (string, error) { log := r.logger.WithFields(logrus.Fields{ "dataupload": r.dataUploadName, }) du := &velerov2alpha1api.DataUpload{} - err := wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, waitControllerTimeout, true, func(ctx context.Context) (bool, error) { + err := wait.PollUntilContextCancel(ctx, 500*time.Millisecond, true, func(ctx context.Context) (bool, error) { err := r.client.Get(ctx, types.NamespacedName{ Namespace: r.namespace, Name: r.dataUploadName, @@ -241,8 +239,6 @@ func (r *BackupMicroService) Shutdown() { var funcMarshal = json.Marshal func (r *BackupMicroService) OnDataUploadCompleted(ctx context.Context, namespace string, duName string, result datapath.Result) { - defer r.closeDataPath(ctx, duName) - log := r.logger.WithField("dataupload", duName) backupBytes, err := funcMarshal(result.Backup) @@ -262,8 +258,6 @@ func (r *BackupMicroService) OnDataUploadCompleted(ctx context.Context, namespac } func (r *BackupMicroService) OnDataUploadFailed(ctx context.Context, namespace string, duName string, err error) { - defer r.closeDataPath(ctx, duName) - log := r.logger.WithField("dataupload", duName) log.WithError(err).Error("Async fs backup data path failed") @@ -274,8 +268,6 @@ func (r *BackupMicroService) OnDataUploadFailed(ctx context.Context, namespace s } func (r *BackupMicroService) OnDataUploadCancelled(ctx context.Context, namespace string, duName string) { - defer r.closeDataPath(ctx, duName) - log := r.logger.WithField("dataupload", duName) log.Warn("Async fs backup data path canceled") @@ -296,8 +288,6 @@ func (r *BackupMicroService) OnDataUploadProgress(ctx context.Context, namespace return } - log.Infof("Sending event for progress %v (%s)", progress, string(progressBytes)) - r.eventRecorder.Event(r.dataUpload, false, datapath.EventReasonProgress, string(progressBytes)) } @@ -313,7 +303,7 @@ func (r *BackupMicroService) closeDataPath(ctx context.Context, duName string) { func (r *BackupMicroService) cancelDataUpload(du *velerov2alpha1api.DataUpload) { r.logger.WithField("DataUpload", du.Name).Info("Data upload is being canceled") - r.eventRecorder.Event(du, false, "Canceling", "Canceling for data upload %s", du.Name) + r.eventRecorder.Event(du, false, datapath.EventReasonCancelling, "Canceling for data upload %s", du.Name) fsBackup := r.dataPathMgr.GetAsyncBR(du.Name) if fsBackup == nil { diff --git a/pkg/datamover/backup_micro_service_test.go b/pkg/datamover/backup_micro_service_test.go index 9db38d4c0d..cca428ee88 100644 --- a/pkg/datamover/backup_micro_service_test.go +++ b/pkg/datamover/backup_micro_service_test.go @@ -301,12 +301,12 @@ func TestRunCancelableDataPath(t *testing.T) { }{ { name: "no du", - ctx: context.Background(), + ctx: ctxTimeout, expectedErr: "error waiting for du: context deadline exceeded", }, { name: "du not in in-progress", - ctx: context.Background(), + ctx: ctxTimeout, kubeClientObj: []runtime.Object{du}, expectedErr: "error waiting for du: context deadline exceeded", }, @@ -412,8 +412,6 @@ func TestRunCancelableDataPath(t *testing.T) { return fsBR } - waitControllerTimeout = time.Second - if test.result != nil { go func() { time.Sleep(time.Millisecond * 500) diff --git a/pkg/datamover/restore_micro_service.go b/pkg/datamover/restore_micro_service.go index 508469701a..d0a4c6f50c 100644 --- a/pkg/datamover/restore_micro_service.go +++ b/pkg/datamover/restore_micro_service.go @@ -122,7 +122,7 @@ func (r *RestoreMicroService) RunCancelableDataPath(ctx context.Context) (string }) dd := &velerov2alpha1api.DataDownload{} - err := wait.PollUntilContextTimeout(ctx, 500*time.Millisecond, waitControllerTimeout, true, func(ctx context.Context) (bool, error) { + err := wait.PollUntilContextCancel(ctx, 500*time.Millisecond, true, func(ctx context.Context) (bool, error) { err := r.client.Get(ctx, types.NamespacedName{ Namespace: r.namespace, Name: r.dataDownloadName, @@ -214,8 +214,6 @@ func (r *RestoreMicroService) Shutdown() { } func (r *RestoreMicroService) OnDataDownloadCompleted(ctx context.Context, namespace string, ddName string, result datapath.Result) { - defer r.closeDataPath(ctx, ddName) - log := r.logger.WithField("datadownload", ddName) restoreBytes, err := funcMarshal(result.Restore) @@ -235,8 +233,6 @@ func (r *RestoreMicroService) OnDataDownloadCompleted(ctx context.Context, names } func (r *RestoreMicroService) OnDataDownloadFailed(ctx context.Context, namespace string, ddName string, err error) { - defer r.closeDataPath(ctx, ddName) - log := r.logger.WithField("datadownload", ddName) log.WithError(err).Error("Async fs restore data path failed") @@ -247,8 +243,6 @@ func (r *RestoreMicroService) OnDataDownloadFailed(ctx context.Context, namespac } func (r *RestoreMicroService) OnDataDownloadCancelled(ctx context.Context, namespace string, ddName string) { - defer r.closeDataPath(ctx, ddName) - log := r.logger.WithField("datadownload", ddName) log.Warn("Async fs restore data path canceled") @@ -284,7 +278,7 @@ func (r *RestoreMicroService) closeDataPath(ctx context.Context, ddName string) func (r *RestoreMicroService) cancelDataDownload(dd *velerov2alpha1api.DataDownload) { r.logger.WithField("DataDownload", dd.Name).Info("Data download is being canceled") - r.eventRecorder.Event(dd, false, "Canceling", "Canceling for data download %s", dd.Name) + r.eventRecorder.Event(dd, false, datapath.EventReasonCancelling, "Canceling for data download %s", dd.Name) fsBackup := r.dataPathMgr.GetAsyncBR(dd.Name) if fsBackup == nil { diff --git a/pkg/datamover/restore_micro_service_test.go b/pkg/datamover/restore_micro_service_test.go index e3ef8701da..8a3ed61e1f 100644 --- a/pkg/datamover/restore_micro_service_test.go +++ b/pkg/datamover/restore_micro_service_test.go @@ -254,12 +254,12 @@ func TestRunCancelableRestore(t *testing.T) { }{ { name: "no dd", - ctx: context.Background(), + ctx: ctxTimeout, expectedErr: "error waiting for dd: context deadline exceeded", }, { name: "dd not in in-progress", - ctx: context.Background(), + ctx: ctxTimeout, kubeClientObj: []runtime.Object{dd}, expectedErr: "error waiting for dd: context deadline exceeded", }, @@ -365,8 +365,6 @@ func TestRunCancelableRestore(t *testing.T) { return fsBR } - waitControllerTimeout = time.Second - if test.result != nil { go func() { time.Sleep(time.Millisecond * 500) diff --git a/pkg/datapath/file_system.go b/pkg/datapath/file_system.go index 762c91b188..5d3b54f281 100644 --- a/pkg/datapath/file_system.go +++ b/pkg/datapath/file_system.go @@ -18,6 +18,7 @@ package datapath import ( "context" + "sync" "github.com/pkg/errors" "github.com/sirupsen/logrus" @@ -66,6 +67,8 @@ type fileSystemBR struct { callbacks Callbacks jobName string requestorType string + wgDataPath sync.WaitGroup + dataPathLock sync.Mutex } func newFileSystemBR(jobName string, requestorType string, client client.Client, namespace string, callbacks Callbacks, log logrus.FieldLogger) AsyncBR { @@ -75,6 +78,7 @@ func newFileSystemBR(jobName string, requestorType string, client client.Client, client: client, namespace: namespace, callbacks: callbacks, + wgDataPath: sync.WaitGroup{}, log: log, } @@ -134,6 +138,23 @@ func (fs *fileSystemBR) Init(ctx context.Context, param interface{}) error { } func (fs *fileSystemBR) Close(ctx context.Context) { + if fs.cancel != nil { + fs.cancel() + } + + fs.log.WithField("user", fs.jobName).Info("Closing FileSystemBR") + + fs.wgDataPath.Wait() + + fs.close(ctx) + + fs.log.WithField("user", fs.jobName).Info("FileSystemBR is closed") +} + +func (fs *fileSystemBR) close(ctx context.Context) { + fs.dataPathLock.Lock() + defer fs.dataPathLock.Unlock() + if fs.uploaderProv != nil { if err := fs.uploaderProv.Close(ctx); err != nil { fs.log.Errorf("failed to close uploader provider with error %v", err) @@ -141,13 +162,6 @@ func (fs *fileSystemBR) Close(ctx context.Context) { fs.uploaderProv = nil } - - if fs.cancel != nil { - fs.cancel() - fs.cancel = nil - } - - fs.log.WithField("user", fs.jobName).Info("FileSystemBR is closed") } func (fs *fileSystemBR) StartBackup(source AccessPoint, uploaderConfig map[string]string, param interface{}) error { @@ -155,9 +169,18 @@ func (fs *fileSystemBR) StartBackup(source AccessPoint, uploaderConfig map[strin return errors.New("file system data path is not initialized") } + fs.wgDataPath.Add(1) + backupParam := param.(*FSBRStartParam) go func() { + fs.log.Info("Start data path backup") + + defer func() { + fs.close(context.Background()) + fs.wgDataPath.Done() + }() + snapshotID, emptySnapshot, err := fs.uploaderProv.RunBackup(fs.ctx, source.ByPath, backupParam.RealSource, backupParam.Tags, backupParam.ForceFull, backupParam.ParentSnapshot, source.VolMode, uploaderConfig, fs) @@ -182,7 +205,16 @@ func (fs *fileSystemBR) StartRestore(snapshotID string, target AccessPoint, uplo return errors.New("file system data path is not initialized") } + fs.wgDataPath.Add(1) + go func() { + fs.log.Info("Start data path restore") + + defer func() { + fs.close(context.Background()) + fs.wgDataPath.Done() + }() + err := fs.uploaderProv.RunRestore(fs.ctx, snapshotID, target.ByPath, target.VolMode, uploaderConfigs, fs) if err == provider.ErrorCanceled { diff --git a/pkg/datapath/file_system_test.go b/pkg/datapath/file_system_test.go index 85c6df08d9..fab33df1c0 100644 --- a/pkg/datapath/file_system_test.go +++ b/pkg/datapath/file_system_test.go @@ -96,6 +96,7 @@ func TestAsyncBackup(t *testing.T) { fs := newFileSystemBR("job-1", "test", nil, "velero", Callbacks{}, velerotest.NewLogger()).(*fileSystemBR) mockProvider := providerMock.NewProvider(t) mockProvider.On("RunBackup", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(test.result.Backup.SnapshotID, test.result.Backup.EmptySnapshot, test.err) + mockProvider.On("Close", mock.Anything).Return(nil) fs.uploaderProv = mockProvider fs.initialized = true fs.callbacks = test.callbacks @@ -179,6 +180,7 @@ func TestAsyncRestore(t *testing.T) { fs := newFileSystemBR("job-1", "test", nil, "velero", Callbacks{}, velerotest.NewLogger()).(*fileSystemBR) mockProvider := providerMock.NewProvider(t) mockProvider.On("RunRestore", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(test.err) + mockProvider.On("Close", mock.Anything).Return(nil) fs.uploaderProv = mockProvider fs.initialized = true fs.callbacks = test.callbacks diff --git a/pkg/datapath/micro_service_watcher.go b/pkg/datapath/micro_service_watcher.go index a5826459a4..d74ca2fc2c 100644 --- a/pkg/datapath/micro_service_watcher.go +++ b/pkg/datapath/micro_service_watcher.go @@ -46,11 +46,12 @@ const ( ErrCancelled = "data path is canceled" - EventReasonStarted = "Data-Path-Started" - EventReasonCompleted = "Data-Path-Completed" - EventReasonFailed = "Data-Path-Failed" - EventReasonCancelled = "Data-Path-Canceled" - EventReasonProgress = "Data-Path-Progress" + EventReasonStarted = "Data-Path-Started" + EventReasonCompleted = "Data-Path-Completed" + EventReasonFailed = "Data-Path-Failed" + EventReasonCancelled = "Data-Path-Canceled" + EventReasonProgress = "Data-Path-Progress" + EventReasonCancelling = "Data-Path-Canceling" ) type microServiceBRWatcher struct { @@ -76,6 +77,7 @@ type microServiceBRWatcher struct { podInformer ctrlcache.Informer eventHandler cache.ResourceEventHandlerRegistration podHandler cache.ResourceEventHandlerRegistration + watcherLock sync.Mutex } func newMicroServiceBRWatcher(client client.Client, kubeClient kubernetes.Interface, mgr manager.Manager, taskType string, taskName string, namespace string, @@ -101,8 +103,6 @@ func newMicroServiceBRWatcher(client client.Client, kubeClient kubernetes.Interf } func (ms *microServiceBRWatcher) Init(ctx context.Context, param interface{}) error { - succeeded := false - eventInformer, err := ms.mgr.GetCache().GetInformer(ctx, &v1.Event{}) if err != nil { return errors.Wrap(err, "error getting event informer") @@ -121,8 +121,6 @@ func (ms *microServiceBRWatcher) Init(ctx context.Context, param interface{}) er return } - ms.log.Infof("Pushed adding event %s/%s, message %s for object %v", evt.Namespace, evt.Name, evt.Message, evt.InvolvedObject) - ms.eventCh <- evt }, UpdateFunc: func(_, obj interface{}) { @@ -131,25 +129,14 @@ func (ms *microServiceBRWatcher) Init(ctx context.Context, param interface{}) er return } - ms.log.Infof("Pushed updating event %s/%s, message %s for object %v", evt.Namespace, evt.Name, evt.Message, evt.InvolvedObject) - ms.eventCh <- evt }, }, ) - if err != nil { return errors.Wrap(err, "error registering event handler") } - defer func() { - if !succeeded { - if err := eventInformer.RemoveEventHandler(eventHandler); err != nil { - ms.log.WithError(err).Warn("Failed to remove event handler") - } - } - }() - podHandler, err := podInformer.AddEventHandler( cache.ResourceEventHandlerFuncs{ UpdateFunc: func(_, obj interface{}) { @@ -164,25 +151,13 @@ func (ms *microServiceBRWatcher) Init(ctx context.Context, param interface{}) er }, }, ) - if err != nil { return errors.Wrap(err, "error registering pod handler") } - defer func() { - if !succeeded { - if err := podInformer.RemoveEventHandler(podHandler); err != nil { - ms.log.WithError(err).Warn("Failed to remove pod handler") - } - } - }() - - ms.log.WithFields( - logrus.Fields{ - "taskType": ms.taskType, - "taskName": ms.taskName, - "thisPod": ms.thisPod, - }).Info("MicroServiceBR is initialized") + if err := ms.reEnsureThisPod(ctx); err != nil { + return err + } ms.eventInformer = eventInformer ms.podInformer = podInformer @@ -191,7 +166,12 @@ func (ms *microServiceBRWatcher) Init(ctx context.Context, param interface{}) er ms.ctx, ms.cancel = context.WithCancel(ctx) - succeeded = true + ms.log.WithFields( + logrus.Fields{ + "taskType": ms.taskType, + "taskName": ms.taskName, + "thisPod": ms.thisPod, + }).Info("MicroServiceBR is initialized") return nil } @@ -199,34 +179,40 @@ func (ms *microServiceBRWatcher) Init(ctx context.Context, param interface{}) er func (ms *microServiceBRWatcher) Close(ctx context.Context) { if ms.cancel != nil { ms.cancel() - ms.cancel = nil } ms.log.WithField("taskType", ms.taskType).WithField("taskName", ms.taskName).Info("Closing MicroServiceBR") ms.wgWatcher.Wait() - if ms.eventInformer != nil && ms.eventHandler != nil { + ms.close() + + ms.log.WithField("taskType", ms.taskType).WithField("taskName", ms.taskName).Info("MicroServiceBR is closed") +} + +func (ms *microServiceBRWatcher) close() { + ms.watcherLock.Lock() + defer ms.watcherLock.Unlock() + + if ms.eventHandler != nil { if err := ms.eventInformer.RemoveEventHandler(ms.eventHandler); err != nil { ms.log.WithError(err).Warn("Failed to remove event handler") } + + ms.eventHandler = nil } - if ms.podInformer != nil && ms.podHandler != nil { + if ms.podHandler != nil { if err := ms.podInformer.RemoveEventHandler(ms.podHandler); err != nil { ms.log.WithError(err).Warn("Failed to remove pod handler") } - } - ms.log.WithField("taskType", ms.taskType).WithField("taskName", ms.taskName).Info("MicroServiceBR is closed") + ms.podHandler = nil + } } func (ms *microServiceBRWatcher) StartBackup(source AccessPoint, uploaderConfig map[string]string, param interface{}) error { - ms.log.Infof("Start watching backup ms for source %v", source) - - if err := ms.reEnsureThisPod(); err != nil { - return err - } + ms.log.Infof("Start watching backup ms for source %v", source.ByPath) ms.startWatch() @@ -234,20 +220,16 @@ func (ms *microServiceBRWatcher) StartBackup(source AccessPoint, uploaderConfig } func (ms *microServiceBRWatcher) StartRestore(snapshotID string, target AccessPoint, uploaderConfigs map[string]string) error { - ms.log.Infof("Start watching restore ms to target %v, from snapshot %s", target, snapshotID) - - if err := ms.reEnsureThisPod(); err != nil { - return err - } + ms.log.Infof("Start watching restore ms to target %s, from snapshot %s", target.ByPath, snapshotID) ms.startWatch() return nil } -func (ms *microServiceBRWatcher) reEnsureThisPod() error { +func (ms *microServiceBRWatcher) reEnsureThisPod(ctx context.Context) error { thisPod := &v1.Pod{} - if err := ms.client.Get(ms.ctx, types.NamespacedName{ + if err := ms.client.Get(ctx, types.NamespacedName{ Namespace: ms.namespace, Name: ms.thisPod, }, thisPod); err != nil { @@ -275,6 +257,11 @@ func (ms *microServiceBRWatcher) startWatch() { go func() { ms.log.Info("Start watching data path pod") + defer func() { + ms.close() + ms.wgWatcher.Done() + }() + var lastPod *v1.Pod watchLoop: @@ -291,14 +278,16 @@ func (ms *microServiceBRWatcher) startWatch() { } if lastPod == nil { - ms.log.Warn("Data path pod watch loop is canceled") - ms.wgWatcher.Done() + ms.log.Warn("Watch loop is canceled on waiting data path pod") return } epilogLoop: for !ms.startedFromEvent || !ms.terminatedFromEvent { select { + case <-ms.ctx.Done(): + ms.log.Warn("Watch loop is canceled on waiting final event") + return case <-time.After(eventWaitTimeout): break epilogLoop case evt := <-ms.eventCh: @@ -339,8 +328,6 @@ func (ms *microServiceBRWatcher) startWatch() { } logger.Info("Complete callback on data path pod termination") - - ms.wgWatcher.Done() }() } @@ -348,20 +335,22 @@ func (ms *microServiceBRWatcher) onEvent(evt *v1.Event) { switch evt.Reason { case EventReasonStarted: ms.startedFromEvent = true - ms.log.Infof("Received data path start message %s", evt.Message) + ms.log.Infof("Received data path start message: %s", evt.Message) case EventReasonProgress: ms.callbacks.OnProgress(ms.ctx, ms.namespace, ms.taskName, funcGetProgressFromMessage(evt.Message, ms.log)) case EventReasonCompleted: - ms.log.Infof("Received data path completed message %v", funcGetResultFromMessage(ms.taskType, evt.Message, ms.log)) + ms.log.Infof("Received data path completed message: %v", funcGetResultFromMessage(ms.taskType, evt.Message, ms.log)) ms.terminatedFromEvent = true case EventReasonCancelled: - ms.log.Infof("Received data path canceled message %s", evt.Message) + ms.log.Infof("Received data path canceled message: %s", evt.Message) ms.terminatedFromEvent = true case EventReasonFailed: - ms.log.Infof("Received data path failed message %s", evt.Message) + ms.log.Infof("Received data path failed message: %s", evt.Message) ms.terminatedFromEvent = true + case EventReasonCancelling: + ms.log.Infof("Received data path canceling message: %s", evt.Message) default: - ms.log.Debugf("Received event for data mover %s.[reason %s, message %s]", ms.taskName, evt.Reason, evt.Message) + ms.log.Infof("Received event for data path %s, reason: %s, message: %s", ms.taskName, evt.Reason, evt.Message) } } diff --git a/pkg/datapath/micro_service_watcher_test.go b/pkg/datapath/micro_service_watcher_test.go index f10f6b3310..f926363e07 100644 --- a/pkg/datapath/micro_service_watcher_test.go +++ b/pkg/datapath/micro_service_watcher_test.go @@ -102,7 +102,7 @@ func TestReEnsureThisPod(t *testing.T) { log: velerotest.NewLogger(), } - err := ms.reEnsureThisPod() + err := ms.reEnsureThisPod(context.Background()) if test.expectErr != "" { assert.EqualError(t, err, test.expectErr) } else { diff --git a/pkg/exposer/csi_snapshot.go b/pkg/exposer/csi_snapshot.go index cd0450ad57..ccdb70dd28 100644 --- a/pkg/exposer/csi_snapshot.go +++ b/pkg/exposer/csi_snapshot.go @@ -172,7 +172,10 @@ func (e *csiSnapshotExposer) Expose(ctx context.Context, ownerObject corev1.Obje backupPVCStorageClass := csiExposeParam.StorageClass backupPVCReadOnly := false if value, exists := csiExposeParam.BackupPVCConfig[csiExposeParam.StorageClass]; exists { - backupPVCStorageClass = value.StorageClass + if value.StorageClass != "" { + backupPVCStorageClass = value.StorageClass + } + backupPVCReadOnly = value.ReadOnly } diff --git a/site/content/docs/main/backup-repository-configuration.md b/site/content/docs/main/backup-repository-configuration.md index 0e1d6343cd..8d33b0176e 100644 --- a/site/content/docs/main/backup-repository-configuration.md +++ b/site/content/docs/main/backup-repository-configuration.md @@ -14,7 +14,7 @@ Conclusively, you have two ways to add/change/delete configurations of a backup - If the BackupRepository CR for the backup repository is already there, you should modify the `repositoryConfig` field. The new changes will be applied to the backup repository at the due time, it doesn't require Velero server to restart. - Otherwise, you can create the backup repository configMap as a template for the BackupRepository CRs that are going to be created. -The backup repository configMap is repository type specified, so for one repository type, you only need to create one set of configurations, they will be applied to all BackupRepository CRs of the same type. Whereas, the changes of `repositoryConfig` field apply to the specific BackupRepository CR only, you may need to change every BackupRepository CR of the same type. +The backup repository configMap is repository type (i.e., kopia, restic) specific, so for one repository type, you only need to create one set of configurations, they will be applied to all BackupRepository CRs of the same type. Whereas, the changes of `repositoryConfig` field apply to the specific BackupRepository CR only, you may need to change every BackupRepository CR of the same type. Below is an example of the BackupRepository configMap with the configurations: ```yaml diff --git a/site/content/docs/main/csi-snapshot-data-movement.md b/site/content/docs/main/csi-snapshot-data-movement.md index 621997c53f..62480a1e36 100644 --- a/site/content/docs/main/csi-snapshot-data-movement.md +++ b/site/content/docs/main/csi-snapshot-data-movement.md @@ -481,7 +481,7 @@ For Velero built-in data mover, Velero uses [BestEffort as the QoS][13] for node If you want to constraint the CPU/memory usage, you need to [customize the resource limits][11]. The CPU/memory consumption is always related to the scale of data to be backed up/restored, refer to [Performance Guidance][12] for more details, so it is highly recommended that you perform your own testing to find the best resource limits for your data. During the restore, the repository may also cache data/metadata so as to reduce the network footprint and speed up the restore. The repository uses its own policy to store and clean up the cache. -For Kopia repository, the cache is stored in the node-agent pod's root file system. Velero allows you to configure a limit of the cache size so that the data mover pod won't be evicted due to running out of the ephemeral storage. For more details, check [Backup Repository Configuration][16]. +For Kopia repository, the cache is stored in the node-agent pod's root file system. Velero allows you to configure a limit of the cache size so that the data mover pod won't be evicted due to running out of the ephemeral storage. For more details, check [Backup Repository Configuration][17]. ### Node Selection @@ -491,8 +491,11 @@ For Velero built-in data mover, it uses Kubernetes' scheduler to mount a snapsho For the backup, you can intervene this scheduling process through [Data Movement Backup Node Selection][15], so that you can decide which node(s) should/should not run the data movement backup for various purposes. For the restore, this is not supported because sometimes the data movement restore must run in the same node where the restored workload pod is scheduled. +### BackupPVC Configuration - +The `BackupPVC` serves as an intermediate Persistent Volume Claim (PVC) utilized during data movement backup operations, providing efficient access to data. +In complex storage environments, optimizing `BackupPVC` configurations can significantly enhance the performance of backup operations. [This document][16] outlines +advanced configuration options for `BackupPVC`, allowing users to fine-tune access modes and storage class settings based on their storage provider's capabilities. [1]: https://github.com/vmware-tanzu/velero/pull/5968 [2]: csi.md @@ -509,4 +512,5 @@ For the restore, this is not supported because sometimes the data movement resto [13]: https://kubernetes.io/docs/concepts/workloads/pods/pod-qos/ [14]: node-agent-concurrency.md [15]: data-movement-backup-node-selection.md -[16]: backup-repository-configuration.md +[16]: data-movement-backup-pvc-configuration.md +[17]: backup-repository-configuration.md diff --git a/site/content/docs/main/data-movement-backup-pvc-configuration.md b/site/content/docs/main/data-movement-backup-pvc-configuration.md new file mode 100644 index 0000000000..61bbf53e14 --- /dev/null +++ b/site/content/docs/main/data-movement-backup-pvc-configuration.md @@ -0,0 +1,54 @@ +--- +title: "BackupPVC Configuration for Data Movement Backup" +layout: docs +--- + +`BackupPVC` is an intermediate PVC to access data from during the data movement backup operation. + +In some scenarios users may need to configure some advanced options of the backupPVC so that the data movement backup +operation could perform better. Specifically: +- For some storage providers, when creating a read-only volume from a snapshot, it is very fast; whereas, if a writable volume + is created from the snapshot, they need to clone the entire disk data, which is time consuming. If the `backupPVC`'s `accessModes` is + set as `ReadOnlyMany`, the volume driver is able to tell the storage to create a read-only volume, which may dramatically shorten the + snapshot expose time. On the other hand, `ReadOnlyMany` is not supported by all volumes. Therefore, users should be allowed to configure + the `accessModes` for the `backupPVC`. +- Some storage providers create one or more replicas when creating a volume, the number of replicas is defined in the storage class. + However, it doesn't make any sense to keep replicas when an intermediate volume used by the backup. Therefore, users should be allowed + to configure another storage class specifically used by the `backupPVC`. + +Velero introduces a new section in the node agent configuration configMap (the name of this configMap is passed using `--node-agent-config` velero server argument) +called `backupPVC`, through which you can specify the following +configurations: + +- `storageClass`: This specifies the storage class to be used for the backupPVC. If this value does not exist or is empty then by +default the source PVC's storage class will be used. + +- `readOnly`: This is a boolean value. If set to `true` then `ReadOnlyMany` will be the only value set to the backupPVC's access modes. Otherwise +`ReadWriteOnce` value will be used. + +A sample of `backupPVC` config as part of the configMap would look like: +```json +{ + "backupPVC": { + "storage-class-1": { + "storageClass": "backupPVC-storage-class", + "readOnly": true + }, + "storage-class-2": { + "storageClass": "backupPVC-storage-class" + }, + "storage-class-3": { + "readOnly": true + } + } +} +``` + +**Note:** +- Users should make sure that the storage class specified in `backupPVC` config should exist in the cluster and can be used by the +`backupPVC`, otherwise the corresponding DataUpload CR will stay in `Accepted` phase until timeout (data movement prepare timeout value is 30m by default). +- If the users are setting `readOnly` value as `true` in the `backupPVC` config then they must also make sure that the storage class that is being used for +`backupPVC` should support creation of `ReadOnlyMany` PVC from a snapshot, otherwise the corresponding DataUpload CR will stay in `Accepted` phase until +timeout (data movement prepare timeout value is 30m by default). +- If any of the above problems occur, then the DataUpload CR is `canceled` after timeout, and the backupPod and backupPVC will be deleted, and the backup +will be marked as `PartiallyFailed`. diff --git a/site/data/docs/main-toc.yml b/site/data/docs/main-toc.yml index cab5468237..ec5f046043 100644 --- a/site/data/docs/main-toc.yml +++ b/site/data/docs/main-toc.yml @@ -52,7 +52,9 @@ toc: - page: CSI Snapshot Data Movement url: /csi-snapshot-data-movement - page: Node-agent Concurrency - url: /node-agent-concurrency + url: /node-agent-concurrency + - page: Backup PVC Configuration + url: /data-movement-backup-pvc-configuration - page: Verifying Self-signed Certificates url: /self-signed-certificates - page: Changing RBAC permissions diff --git a/test/Makefile b/test/Makefile index ff62230b32..fce0b9d8ee 100644 --- a/test/Makefile +++ b/test/Makefile @@ -167,7 +167,7 @@ run-e2e: ginkgo (echo "Cloud provider for target cloud/plugin provider is required, please rerun with CLOUD_PROVIDER="; exit 1) @$(GINKGO) run \ -v \ - --junit-report report.xml \ + --junit-report e2e/report.xml \ --label-filter="$(GINKGO_LABELS)" \ --timeout=5h \ ./e2e \ @@ -207,7 +207,7 @@ run-perf: ginkgo (echo "Cloud provider for target cloud/plugin provider is required, please rerun with CLOUD_PROVIDER="; exit 1) @$(GINKGO) run \ -v \ - --junit-report report.xml \ + --junit-report perf/report.xml \ --label-filter="$(GINKGO_LABELS)" \ --timeout=5h \ ./perf \ diff --git a/test/e2e/e2e_suite_test.go b/test/e2e/e2e_suite_test.go index 0d22510bc1..1031958084 100644 --- a/test/e2e/e2e_suite_test.go +++ b/test/e2e/e2e_suite_test.go @@ -113,7 +113,7 @@ func init() { // cases can be executed as expected successful result. var _ = Describe("Velero tests with various CRD API group versions", - Label("APIGroup", "APIVersion", "SKIP_KIND"), APIGroupVersionsTest) + Label("APIGroup", "APIVersion", "SKIP_KIND", "LongTime"), APIGroupVersionsTest) var _ = Describe("CRD of apiextentions v1beta1 should be B/R successfully from cluster(k8s version < 1.22) to cluster(k8s version >= 1.22)", Label("APIGroup", "APIExtensions", "SKIP_KIND"), APIExtensionsVersionsTest) @@ -197,9 +197,9 @@ var _ = Describe("Backups in object storage are synced to a new Velero and delet var _ = Describe("Backup will be created periodically by schedule defined by a Cron expression", Label("Schedule", "BR", "Pause", "LongTime"), ScheduleBackupTest) var _ = Describe("Backup resources should follow the specific order in schedule", - Label("Schedule", "OrderedResources"), ScheduleOrderedResources) + Label("Schedule", "OrderedResources", "LongTime"), ScheduleOrderedResources) var _ = Describe("Schedule controller wouldn't create a new backup when it still has pending or InProgress backup", - Label("Schedule", "BackupCreation", "SKIP_KIND"), ScheduleBackupCreationTest) + Label("Schedule", "BackupCreation", "SKIP_KIND", "LongTime"), ScheduleBackupCreationTest) var _ = Describe("Velero test on ssr object when controller namespace mix-ups", Label("PrivilegesMgmt", "SSR"), SSRTest)