Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

issue-464:Added support for JvmOption customization #462

Merged
merged 9 commits into from
Oct 20, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 67 additions & 1 deletion pkg/apis/pravega/v1beta1/pravegacluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -889,7 +889,15 @@ func (p *PravegaCluster) ValidateCreate() error {
// ValidateUpdate implements webhook.Validator so a webhook will be registered for the type
func (p *PravegaCluster) ValidateUpdate(old runtime.Object) error {
log.Printf("validate update %s", p.Name)
return p.validatePravegaVersion()
err := p.validatePravegaVersion()
if err != nil {
return err
}
err = p.validateConfigMap()
if err != nil {
return err
}
return nil
}

// ValidateDelete implements webhook.Validator so a webhook will be registered for the type
Expand Down Expand Up @@ -995,6 +1003,64 @@ func (p *PravegaCluster) validatePravegaVersion() error {
return nil
}

func (p *PravegaCluster) validateConfigMap() error {

configmap := &corev1.ConfigMap{}
err := Mgr.GetClient().Get(context.TODO(),
types.NamespacedName{Name: p.ConfigMapNameForController(), Namespace: p.Namespace}, configmap)
if err != nil {
if errors.IsNotFound(err) {
return nil
} else {
return fmt.Errorf("failed to get configmap (%s): %v", configmap.Name, err)
}
}
if val, ok := p.Spec.Pravega.Options["controller.containerCount"]; ok {
checkstring := fmt.Sprintf("-Dcontroller.containerCount=%v", val)
eq := strings.Contains(configmap.Data["JAVA_OPTS"], checkstring)
if !eq {
return fmt.Errorf("controller.containerCount should not be changed ")
}
}
if val, ok := p.Spec.Pravega.Options["controller.container.count"]; ok {
checkstring := fmt.Sprintf("-Dcontroller.container.count=%v", val)
eq := strings.Contains(configmap.Data["JAVA_OPTS"], checkstring)
if !eq {
return fmt.Errorf("controller.container.count should not be changed ")
}
}
if val, ok := p.Spec.Pravega.Options["pravegaservice.containerCount"]; ok {
checkstring := fmt.Sprintf("-Dpravegaservice.containerCount=%v", val)
eq := strings.Contains(configmap.Data["JAVA_OPTS"], checkstring)
if !eq {
return fmt.Errorf("pravegaservice.containerCount should not be changed ")
}
}
if val, ok := p.Spec.Pravega.Options["pravegaservice.container.count"]; ok {
checkstring := fmt.Sprintf("-Dpravegaservice.container.count=%v", val)
eq := strings.Contains(configmap.Data["JAVA_OPTS"], checkstring)
if !eq {
return fmt.Errorf("pravegaservice.container.count should not be changed ")
}
}
if val, ok := p.Spec.Pravega.Options["bookkeeper.bkLedgerPath"]; ok {
checkstring := fmt.Sprintf("-Dbookkeeper.bkLedgerPath=%v", val)
eq := strings.Contains(configmap.Data["JAVA_OPTS"], checkstring)
if !eq {
return fmt.Errorf("bookkeeper.bkLedgerPath should not be changed ")
}
}
if val, ok := p.Spec.Pravega.Options["bookkeeper.ledger.path"]; ok {
checkstring := fmt.Sprintf("-Dbookkeeper.ledger.path=%v", val)
eq := strings.Contains(configmap.Data["JAVA_OPTS"], checkstring)
if !eq {
return fmt.Errorf("bookkeeper.ledger.path should not be changed ")
}
}
log.Print("validateConfigMap:: No error found...returning...")
return nil
}

//to return name of segmentstore based on the version
func (p *PravegaCluster) StatefulSetNameForSegmentstore() string {
if util.IsVersionBelow07(p.Spec.Version) {
Expand Down
3 changes: 3 additions & 0 deletions pkg/controller/pravega/pravega_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package pravega

import (
"fmt"
"sort"
"strings"

api "github.com/pravega/pravega-operator/pkg/apis/pravega/v1beta1"
Expand Down Expand Up @@ -201,6 +202,8 @@ func MakeControllerConfigMap(p *api.PravegaCluster) *corev1.ConfigMap {
javaOpts = append(javaOpts, fmt.Sprintf("-D%v=%v", name, value))
}

sort.Strings(javaOpts)

authEnabledStr := fmt.Sprint(p.Spec.Authentication.IsEnabled())
configData := map[string]string{
"CLUSTER_NAME": p.Name,
Expand Down
4 changes: 4 additions & 0 deletions pkg/controller/pravega/pravega_segmentstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package pravega

import (
"fmt"
"sort"
"strings"

api "github.com/pravega/pravega-operator/pkg/apis/pravega/v1beta1"
Expand Down Expand Up @@ -220,6 +221,9 @@ func MakeSegmentstoreConfigMap(p *api.PravegaCluster) *corev1.ConfigMap {
for name, value := range p.Spec.Pravega.Options {
javaOpts = append(javaOpts, fmt.Sprintf("-D%v=%v", name, value))
}

sort.Strings(javaOpts)

authEnabledStr := fmt.Sprint(p.Spec.Authentication.IsEnabled())
configData := map[string]string{
"AUTHORIZATION_ENABLED": authEnabledStr,
Expand Down
207 changes: 193 additions & 14 deletions pkg/controller/pravegacluster/pravegacluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ func (r *ReconcilePravegaCluster) run(p *pravegav1beta1.PravegaCluster) (err err
return fmt.Errorf("failed to reconcile finalizers %v", err)
}

err = r.reconcileConfigMap(p)
if err != nil {
return fmt.Errorf("failed to reconcile configMap %v", err)
}

err = r.deployCluster(p)
if err != nil {
return fmt.Errorf("failed to deploy cluster: %v", err)
Expand Down Expand Up @@ -187,6 +192,86 @@ func (r *ReconcilePravegaCluster) reconcileFinalizers(p *pravegav1beta1.PravegaC
return nil
}

func (r *ReconcilePravegaCluster) reconcileConfigMap(p *pravegav1beta1.PravegaCluster) (err error) {

err = r.reconcileControllerConfigMap(p)
if err != nil {
return err
}

err = r.reconcileSegmentStoreConfigMap(p)
if err != nil {
return err
}

return nil

}

func (r *ReconcilePravegaCluster) reconcileControllerConfigMap(p *pravegav1beta1.PravegaCluster) (err error) {

currentConfigMap := &corev1.ConfigMap{}
configMap := pravega.MakeControllerConfigMap(p)
controllerutil.SetControllerReference(p, configMap, r.scheme)
err = r.client.Get(context.TODO(), types.NamespacedName{Name: p.ConfigMapNameForController(), Namespace: p.Namespace}, currentConfigMap)
if err != nil {
if errors.IsNotFound(err) {
err = r.client.Create(context.TODO(), configMap)
if err != nil && !errors.IsAlreadyExists(err) {
return err
}
}
} else {
currentConfigMap := &corev1.ConfigMap{}
err = r.client.Get(context.TODO(), types.NamespacedName{Name: p.ConfigMapNameForController(), Namespace: p.Namespace}, currentConfigMap)
eq := util.CompareConfigMap(currentConfigMap, configMap)
if !eq {
err := r.client.Update(context.TODO(), configMap)
if err != nil {
return err
}
//restarting controller pods
err = r.restartDeploymentPod(p)
if err != nil {
return err
}
}
}
return nil
}

func (r *ReconcilePravegaCluster) reconcileSegmentStoreConfigMap(p *pravegav1beta1.PravegaCluster) (err error) {

currentConfigMap := &corev1.ConfigMap{}
configMap := pravega.MakeSegmentstoreConfigMap(p)
controllerutil.SetControllerReference(p, configMap, r.scheme)
err = r.client.Get(context.TODO(), types.NamespacedName{Name: p.ConfigMapNameForSegmentstore(), Namespace: p.Namespace}, currentConfigMap)
if err != nil {
if errors.IsNotFound(err) {
err = r.client.Create(context.TODO(), configMap)
if err != nil && !errors.IsAlreadyExists(err) {
return err
}
}
} else {
currentConfigMap := &corev1.ConfigMap{}
err = r.client.Get(context.TODO(), types.NamespacedName{Name: p.ConfigMapNameForSegmentstore(), Namespace: p.Namespace}, currentConfigMap)
eq := util.CompareConfigMap(currentConfigMap, configMap)
if !eq {
err := r.client.Update(context.TODO(), configMap)
if err != nil {
return err
}
//restarting sts pods
err = r.restartStsPod(p)
if err != nil {
return err
}
}
}
return nil
}

func (r *ReconcilePravegaCluster) cleanUpZookeeperMeta(p *pravegav1beta1.PravegaCluster) (err error) {
if err = p.WaitForClusterToTerminate(r.client); err != nil {
return fmt.Errorf("failed to wait for cluster pods termination (%s): %v", p.Name, err)
Expand Down Expand Up @@ -316,13 +401,6 @@ func (r *ReconcilePravegaCluster) deployController(p *pravegav1beta1.PravegaClus
return err
}

configMap := pravega.MakeControllerConfigMap(p)
controllerutil.SetControllerReference(p, configMap, r.scheme)
err = r.client.Create(context.TODO(), configMap)
if err != nil && !errors.IsAlreadyExists(err) {
return err
}

deployment := pravega.MakeControllerDeployment(p)
controllerutil.SetControllerReference(p, deployment, r.scheme)
err = r.client.Create(context.TODO(), deployment)
Expand Down Expand Up @@ -367,13 +445,6 @@ func (r *ReconcilePravegaCluster) deploySegmentStore(p *pravegav1beta1.PravegaCl
return err
}

configMap := pravega.MakeSegmentstoreConfigMap(p)
controllerutil.SetControllerReference(p, configMap, r.scheme)
err = r.client.Create(context.TODO(), configMap)
if err != nil && !errors.IsAlreadyExists(err) {
return err
}

statefulSet := pravega.MakeSegmentStoreStatefulSet(p)
controllerutil.SetControllerReference(p, statefulSet, r.scheme)
if statefulSet.Spec.VolumeClaimTemplates != nil {
Expand Down Expand Up @@ -416,6 +487,114 @@ func hasOldVersionOwnerReference(ownerreference []metav1.OwnerReference) bool {
return false
}

func (r *ReconcilePravegaCluster) restartStsPod(p *pravegav1beta1.PravegaCluster) error {

currentSts := &appsv1.StatefulSet{}
err := r.client.Get(context.TODO(), types.NamespacedName{Name: p.StatefulSetNameForSegmentstore(), Namespace: p.Namespace}, currentSts)
if err != nil {
return err
}
selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
MatchLabels: currentSts.Spec.Template.Labels,
})
if err != nil {
return fmt.Errorf("failed to convert label selector: %v", err)
}
podList := &corev1.PodList{}
podlistOps := &client.ListOptions{
Namespace: currentSts.Namespace,
LabelSelector: selector,
}
err = r.client.List(context.TODO(), podList, podlistOps)
if err != nil {
return err
}

for _, podItem := range podList.Items {
err := r.client.Delete(context.TODO(), &podItem)
if err != nil {
return err
} else {
start := time.Now()
pod := &corev1.Pod{}
err = r.client.Get(context.TODO(), types.NamespacedName{Name: podItem.ObjectMeta.Name, Namespace: podItem.ObjectMeta.Namespace}, pod)
for util.IsPodReady(pod) {
if time.Since(start) > 10*time.Minute {
return fmt.Errorf("failed to delete Segmentstore pod (%s) for 10 mins ", podItem.ObjectMeta.Name)
}
err = r.client.Get(context.TODO(), types.NamespacedName{Name: podItem.ObjectMeta.Name, Namespace: podItem.ObjectMeta.Namespace}, pod)
}
start = time.Now()
err = r.client.Get(context.TODO(), types.NamespacedName{Name: podItem.ObjectMeta.Name, Namespace: podItem.ObjectMeta.Namespace}, pod)
for !util.IsPodReady(pod) {
if time.Since(start) > 10*time.Minute {
return fmt.Errorf("failed to get Segmentstore pod (%s) as ready for 10 mins ", podItem.ObjectMeta.Name)
}
err = r.client.Get(context.TODO(), types.NamespacedName{Name: podItem.ObjectMeta.Name, Namespace: podItem.ObjectMeta.Namespace}, pod)
}
}
}
return nil
}

func (r *ReconcilePravegaCluster) restartDeploymentPod(p *pravegav1beta1.PravegaCluster) error {

currentDeployment := &appsv1.Deployment{}
err := r.client.Get(context.TODO(), types.NamespacedName{Name: p.DeploymentNameForController(), Namespace: p.Namespace}, currentDeployment)
if err != nil {
return err
}
selector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{
MatchLabels: currentDeployment.Spec.Template.Labels,
})
if err != nil {
return fmt.Errorf("failed to convert label selector: %v", err)
}
podList := &corev1.PodList{}
podlistOps := &client.ListOptions{
Namespace: currentDeployment.Namespace,
LabelSelector: selector,
}
err = r.client.List(context.TODO(), podList, podlistOps)
if err != nil {
return err
}

for _, podItem := range podList.Items {
err := r.client.Delete(context.TODO(), &podItem)
if err != nil {
return err
} else {
start := time.Now()
pod := &corev1.Pod{}
err = r.client.Get(context.TODO(), types.NamespacedName{Name: podItem.ObjectMeta.Name, Namespace: podItem.ObjectMeta.Namespace}, pod)
for util.IsPodReady(pod) {
if time.Since(start) > 10*time.Minute {
return fmt.Errorf("failed to delete controller pod (%s) for 10 mins ", podItem.ObjectMeta.Name)
}
err = r.client.Get(context.TODO(), types.NamespacedName{Name: podItem.ObjectMeta.Name, Namespace: podItem.ObjectMeta.Namespace}, pod)
}
deploy := &appsv1.Deployment{}
name := p.DeploymentNameForController()
err = r.client.Get(context.TODO(), types.NamespacedName{Name: name, Namespace: p.Namespace}, deploy)
if err != nil {
return fmt.Errorf("failed to get deployment (%s): %v", deploy.Name, err)
}
start = time.Now()
for deploy.Status.ReadyReplicas != deploy.Status.Replicas {
if time.Since(start) > 10*time.Minute {
return fmt.Errorf("failed to make controller pod ready for 10 mins ")
}
err = r.client.Get(context.TODO(), types.NamespacedName{Name: name, Namespace: p.Namespace}, deploy)
if err != nil {
return fmt.Errorf("failed to get deployment (%s): %v", deploy.Name, err)
}
}
}
}
return nil
}

func (r *ReconcilePravegaCluster) syncClusterSize(p *pravegav1beta1.PravegaCluster) (err error) {
/*We skip calling syncSegmentStoreSize() during upgrade/rollback from version 07*/
if !r.IsClusterUpgradingTo07(p) && !r.IsClusterRollbackingFrom07(p) {
Expand Down
9 changes: 9 additions & 0 deletions pkg/util/pravegacluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package util

import (
"fmt"
"reflect"
"regexp"
"strconv"
"strings"
Expand Down Expand Up @@ -46,6 +47,14 @@ func IsVersionBelow07(ver string) bool {
return false
}

func CompareConfigMap(cm1 *corev1.ConfigMap, cm2 *corev1.ConfigMap) bool {
eq := reflect.DeepEqual(cm1.Data, cm2.Data)
if eq {
return true
}
return false
}

func IsOrphan(k8sObjectName string, replicas int32) bool {
index := strings.LastIndexAny(k8sObjectName, "-")
if index == -1 {
Expand Down
Loading