Skip to content

Commit

Permalink
Implement transfer
Browse files Browse the repository at this point in the history
  • Loading branch information
mkimuram committed May 3, 2021
1 parent bbc825a commit 11b8ed0
Show file tree
Hide file tree
Showing 3 changed files with 379 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,11 @@ func (adc *attachDetachController) syncPVCByKey(key string) error {
return nil
}

if volumeutil.IsTransferInProgress(pvc) || volumeutil.IsReceiveInProgress(pvc) {
// Skip attach/detach for transferring/receiving PVCs.
return nil
}

objs, err := adc.podIndexer.ByIndex(common.PodPVCIndex, key)
if err != nil {
return err
Expand Down
226 changes: 220 additions & 6 deletions pkg/controller/volume/persistentvolume/pv_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,38 @@ func (ctrl *PersistentVolumeController) syncUnboundClaim(claim *v1.PersistentVol
return err
}

// Receive transferred volume
if claim.Spec.Transfer != nil && claim.Spec.Transfer.Source != nil {
sourceClaim, err := ctrl.kubeClient.CoreV1().PersistentVolumeClaims(claim.Spec.Transfer.Source.Namespace).Get(context.TODO(), claim.Spec.Transfer.Source.Name, metav1.GetOptions{})
if err != nil {
return err
}
// Check if transfer source is ready to transfer
if !util.IsTransferInProgress(sourceClaim) {
// Transfer source isn't ready to transfer,
// so can't start receiving PV.
return nil
}

if util.IsReceiveInProgress(claim) {
if claim.Spec.VolumeName == "" {
// Force bind the PV to the claim
if err := ctrl.forceBindClaimToVolume(claim, sourceClaim.Spec.VolumeName); err != nil {
return err
}
}
// syncBoundClaim will handle rest of the process
return nil
}

// Mark the claim as ReceiveInProgress and try to recieve in the next
// periodic syncClaim
if _, err := util.MarkReceiveInProgress(claim, ctrl.kubeClient); err != nil {
return err
}
return nil
}

// [Unit test set 1]
volume, err := ctrl.volumes.findBestMatchForClaim(claim, delayBinding)
if err != nil {
Expand Down Expand Up @@ -478,6 +510,20 @@ func (ctrl *PersistentVolumeController) syncBoundClaim(claim *v1.PersistentVolum
// OBSERVATION: pvc is not "Pending"
// [Unit test set 3]
if claim.Spec.VolumeName == "" {
if claim.Spec.Transfer != nil && claim.Spec.Transfer.Destination != nil {
switch {
case util.IsTransferInProgress(claim):
// Still TransferInProgress
return nil
case util.IsTransferFinished(claim):
if _, err := ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeNormal, "ClaimLost", "Bound PersistentVolume has been transfered. This PersistentVolumeClaim can safely be deleted."); err != nil {
return err
}
return nil
default:
// Unexpected state with Transfer.Destination set. Fall through to error.
}
}
// Claim was bound before but not any more.
if _, err := ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, "ClaimLost", "Bound claim has lost reference to PersistentVolume. Data on the volume is lost!"); err != nil {
return err
Expand Down Expand Up @@ -513,6 +559,39 @@ func (ctrl *PersistentVolumeController) syncBoundClaim(claim *v1.PersistentVolum
}
return nil
} else if volume.Spec.ClaimRef.UID == claim.UID {
if claim.Spec.Transfer != nil && claim.Spec.Transfer.Destination != nil {
if util.IsTransferInProgress(claim) {
// Already marked as TransferInProgress
return nil
}
// Check if claim is in-use
_, inUse, err := ctrl.isClaimUsed(claim.Namespace, claim.Name)
if err != nil {
return err
}
if inUse {
// Claim is still in-use
return nil
}
// This claim is ready to be transferred, so marking as TransferInProgress
if _, err := util.MarkTransferInProgress(claim, ctrl.kubeClient); err != nil {
return err
}
return nil
}

if claim.Spec.Transfer != nil && claim.Spec.Transfer.Source != nil {
switch {
case util.IsReceiveInProgress(claim):
if err := ctrl.receiveTransferredPV(claim); err != nil {
return err
}
return nil
default:
// Unexpected state with Transfer.Source set. Fall through to error.
}
}

// All is well
// NOTE: syncPV can handle this so it can be left out.
// NOTE: bind() call here will do nothing in most cases as
Expand All @@ -524,6 +603,18 @@ func (ctrl *PersistentVolumeController) syncBoundClaim(claim *v1.PersistentVolum
}
return nil
} else {
if claim.Spec.Transfer != nil && claim.Spec.Transfer.Source != nil {
switch {
case util.IsReceiveInProgress(claim):
if err := ctrl.receiveTransferredPV(claim); err != nil {
return err
}
return nil
default:
// Unexpected state with Transfer.Source set. Fall through to error.
}
}

// Claim is bound but volume has a different claimant.
// Set the claim phase to 'Lost', which is a terminal
// phase.
Expand Down Expand Up @@ -1358,13 +1449,10 @@ func (ctrl *PersistentVolumeController) findPodsByPVCKey(key string) ([]*v1.Pod,
return pods, err
}

// isVolumeUsed returns list of active pods that use given PV.
func (ctrl *PersistentVolumeController) isVolumeUsed(pv *v1.PersistentVolume) ([]string, bool, error) {
if pv.Spec.ClaimRef == nil {
return nil, false, nil
}
// isClaimUsed returns list of active pods that use given PVC.
func (ctrl *PersistentVolumeController) isClaimUsed(claimNs, claimName string) ([]string, bool, error) {
podNames := sets.NewString()
pvcKey := fmt.Sprintf("%s/%s", pv.Spec.ClaimRef.Namespace, pv.Spec.ClaimRef.Name)
pvcKey := fmt.Sprintf("%s/%s", claimNs, claimName)
pods, err := ctrl.findPodsByPVCKey(pvcKey)
if err != nil {
return nil, false, fmt.Errorf("error finding pods by pvc %q: %s", pvcKey, err)
Expand All @@ -1378,6 +1466,15 @@ func (ctrl *PersistentVolumeController) isVolumeUsed(pv *v1.PersistentVolume) ([
return podNames.List(), podNames.Len() != 0, nil
}

// isVolumeUsed returns list of active pods that use given PV.
func (ctrl *PersistentVolumeController) isVolumeUsed(pv *v1.PersistentVolume) ([]string, bool, error) {
if pv.Spec.ClaimRef == nil {
return nil, false, nil
}

return ctrl.isClaimUsed(pv.Spec.ClaimRef.Namespace, pv.Spec.ClaimRef.Name)
}

// findNonScheduledPodsByPVC returns list of non-scheduled active pods that reference given PVC.
func (ctrl *PersistentVolumeController) findNonScheduledPodsByPVC(pvc *v1.PersistentVolumeClaim) ([]string, error) {
pvcKey := fmt.Sprintf("%s/%s", pvc.Namespace, pvc.Name)
Expand Down Expand Up @@ -1884,3 +1981,120 @@ func (ctrl *PersistentVolumeController) getProvisionerName(plugin vol.Provisiona
}
return storageClass.Provisioner
}

// receiveTransferredPV handles the actual transfer/receive logic
func (ctrl *PersistentVolumeController) receiveTransferredPV(claim *v1.PersistentVolumeClaim) error {
// Get SourceClaim
sourceClaim, err := ctrl.kubeClient.CoreV1().PersistentVolumeClaims(claim.Spec.Transfer.Source.Namespace).Get(context.TODO(), claim.Spec.Transfer.Source.Name, metav1.GetOptions{})
if err != nil {
return err
}

if sourceClaim.Spec.VolumeName != "" {
// sourceClaim is still referencing the PV,
// so remove refrence from the sourceClaim to the PV
if err := ctrl.unbindClaimFromVolume(sourceClaim); err != nil {
return err
}
return nil
} else {
// Get the received volume
volume, err := ctrl.kubeClient.CoreV1().PersistentVolumes().Get(context.TODO(), claim.Spec.VolumeName, metav1.GetOptions{})
if err != nil {
return err
}

if volume.Spec.ClaimRef != nil &&
volume.Spec.ClaimRef.Namespace == sourceClaim.Namespace &&
volume.Spec.ClaimRef.Name == sourceClaim.Name {
// Still bound to sourceClaim, so unbind the PV from sourceClaim
if err := ctrl.unbindVolume(volume); err != nil {
return err
}
}

// TODO: Volume needs to be modified here, before it is bound to destination PVC.
// For example, CSI secrets may referencing source PVC name and namespace.

// Bind volume to claim
if err = ctrl.bind(volume, claim); err != nil {
return err
}

// Mark sourceClaim as MarkTransferFinished
if _, err := util.MarkTransferFinished(sourceClaim, ctrl.kubeClient); err != nil {
return err
}

// Mark claim as MarkReceiveFinished
if _, err := util.MarkReceiveFinished(claim, ctrl.kubeClient); err != nil {
return err
}
}

return nil
}

// forceBindClaim force update binding information of the claim,
// it updates even volume is binded to another claim.
func (ctrl *PersistentVolumeController) forceBindClaimToVolume(claim *v1.PersistentVolumeClaim, volumeName string) error {
klog.V(4).Infof("force updating PersistentVolumeClaim[%s]: binding to %q", claimToClaimKey(claim), volumeName)

// The claim from method args can be pointing to watcher cache. We must not
// modify these, therefore create a copy.
claimClone := claim.DeepCopy()

// Bind the claim to the volume
claimClone.Spec.VolumeName = volumeName

// Set AnnBoundByController if it is not set yet
if !metav1.HasAnnotation(claimClone.ObjectMeta, pvutil.AnnBoundByController) {
metav1.SetMetaDataAnnotation(&claimClone.ObjectMeta, pvutil.AnnBoundByController, "yes")
}

// Set AnnBindCompleted if it is not set yet
if !metav1.HasAnnotation(claimClone.ObjectMeta, pvutil.AnnBindCompleted) {
metav1.SetMetaDataAnnotation(&claimClone.ObjectMeta, pvutil.AnnBindCompleted, "yes")
}

klog.V(2).Infof("volume %q force bound to claim %q", volumeName, claimToClaimKey(claim))
newClaim, err := ctrl.kubeClient.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(context.TODO(), claimClone, metav1.UpdateOptions{})
if err != nil {
klog.V(4).Infof("force updating PersistentVolumeClaim[%s]: binding to %q failed: %v", claimToClaimKey(claim), volumeName, err)
return err
}

_, err = ctrl.storeClaimUpdate(newClaim)
if err != nil {
klog.V(4).Infof("force updating PersistentVolumeClaim[%s]: cannot update internal cache: %v", claimToClaimKey(claim), err)
return err
}

return nil
}

// unbindClaim delete binding information from the claim
func (ctrl *PersistentVolumeController) unbindClaimFromVolume(claim *v1.PersistentVolumeClaim) error {
// The claim from method args can be pointing to watcher cache. We must not
// modify these, therefore create a copy.
claimClone := claim.DeepCopy()
volumeName := claimClone.Spec.VolumeName
klog.V(4).Infof("updating PersistentVolumeClaim[%s]: unbinding from %q", claimToClaimKey(claim), volumeName)
// Delete the volume information from the claim
claimClone.Spec.VolumeName = ""

klog.V(2).Infof("volume %q unbound from claim %q", volumeName, claimToClaimKey(claim))
newClaim, err := ctrl.kubeClient.CoreV1().PersistentVolumeClaims(claim.Namespace).Update(context.TODO(), claimClone, metav1.UpdateOptions{})
if err != nil {
klog.V(4).Infof("updating PersistentVolumeClaim[%s]: unbinding from %q failed: %v", claimToClaimKey(claim), volumeName, err)
return err
}

_, err = ctrl.storeClaimUpdate(newClaim)
if err != nil {
klog.V(4).Infof("updating PersistentVolumeClaim[%s]: cannot update internal cache: %v", claimToClaimKey(claim), err)
return err
}

return nil
}
Loading

0 comments on commit 11b8ed0

Please sign in to comment.