Skip to content

Commit

Permalink
Child resource recreation (#208)
Browse files Browse the repository at this point in the history
* Recreate gateway resources deleted during running phase

* Move hasher to common

* Delete gateway resources on spec change

* Fix service update condition

* Return error

* Refactor common code

* Handle resource change on error

* Use slower rate limiter

* Fix group version kind

* Apply resource recreation to sensors

* Fix state change condition

* Add more tests

* Remove unnecessary imports

* Update Gopkg.lock

* Fix tests

* Add informer event tests

* Compare GroupVersionKind instead of just Kind

* Fix data race

* Rename owner variables

* Improve gateway resource update logic

* Refactor the resource handling context

* Refactor sensor resource context

* Refactor gateway resource recreation

* Improve sensor resource update logic

* Fix state change condition

* Improve error handling

* Update types of resource specs

* Set up containers surely

* Fix wrong label and annotation names

* Update sensor examples

* Fix tests

* Fix race condition

* Improve fake controllers
  • Loading branch information
dtaniwaki authored and VaibhavPage committed Mar 12, 2019
1 parent e362cbc commit cb8ebd8
Show file tree
Hide file tree
Showing 77 changed files with 3,635 additions and 1,164 deletions.
69 changes: 68 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ const (

// EnvVarSensorControllerConfigMap is the name of the configmap to use for the sensor-controller
EnvVarSensorControllerConfigMap = "SENSOR_CONFIG_MAP"

// AnnotationSensorResourceSpecHashName is the annotation of a sensor resource spec hash
AnnotationSensorResourceSpecHashName = sensor.FullName + "/resource-spec-hash"
)

// SENSOR CONSTANTS
Expand Down Expand Up @@ -125,6 +128,9 @@ const (
// LabelGatewayEventSourceID is the label for gateway configuration ID
LabelGatewayEventSourceID = "event-source-id"

// AnnotationGatewayResourceSpecHashName is the annotation of a gateway resource spec hash
AnnotationGatewayResourceSpecHashName = gateway.FullName + "/resource-spec-hash"

// Server Connection Timeout, 10 seconds
ServerConnTimeout = 10
)
Expand Down
18 changes: 18 additions & 0 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ limitations under the License.
package common

import (
"encoding/json"
"fmt"
"hash/fnv"
"os"
"time"

Expand Down Expand Up @@ -108,3 +110,19 @@ func LoggerConf() zerolog.ConsoleWriter {
func GetLoggerContext(opt zerolog.ConsoleWriter) zerolog.Context {
return zerolog.New(opt).With().Timestamp()
}

// Hasher hashes a string
func Hasher(value string) string {
h := fnv.New32a()
_, _ = h.Write([]byte(value))
return fmt.Sprintf("%v", h.Sum32())
}

// GetObjectHash returns hash of a given object
func GetObjectHash(obj metav1.Object) (string, error) {
b, err := json.Marshal(obj)
if err != nil {
return "", fmt.Errorf("failed to marshal resource")
}
return Hasher(string(b)), nil
}
19 changes: 18 additions & 1 deletion common/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,31 @@ limitations under the License.
package common

import (
"github.com/smartystreets/goconvey/convey"
"testing"

"github.com/smartystreets/goconvey/convey"

"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes/fake"
)

func TestGetObjectHash(t *testing.T) {
convey.Convey("Given a value, hash it", t, func() {
hash, err := GetObjectHash(&corev1.Pod{})
convey.So(hash, convey.ShouldNotBeEmpty)
convey.So(err, convey.ShouldBeEmpty)
})
}

func TestHasher(t *testing.T) {
convey.Convey("Given a value, hash it", t, func() {
hash := Hasher("test")
convey.So(hash, convey.ShouldNotBeEmpty)
})
}

func TestDefaultConfigMapName(t *testing.T) {
res := DefaultConfigMapName("sensor-controller")
assert.Equal(t, "sensor-controller-configmap", res)
Expand Down
86 changes: 86 additions & 0 deletions controllers/common/informer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package common

import (
"fmt"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/informers"
informersv1 "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)

// ArgoEventInformerFactory holds values to create SharedInformerFactory of argo-events
type ArgoEventInformerFactory struct {
OwnerGroupVersionKind schema.GroupVersionKind
OwnerInformer cache.SharedIndexInformer
informers.SharedInformerFactory
Queue workqueue.RateLimitingInterface
}

// NewPodInformer returns a PodInformer of argo-events
func (c *ArgoEventInformerFactory) NewPodInformer() informersv1.PodInformer {
podInformer := c.SharedInformerFactory.Core().V1().Pods()
podInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
DeleteFunc: func(obj interface{}) {
owner, err := c.getOwner(obj)
if err != nil {
return
}
key, err := cache.MetaNamespaceKeyFunc(owner)
if err == nil {
c.Queue.Add(key)
}
},
},
)
return podInformer
}

// NewServiceInformer returns a ServiceInformer of argo-events
func (c *ArgoEventInformerFactory) NewServiceInformer() informersv1.ServiceInformer {
svcInformer := c.SharedInformerFactory.Core().V1().Services()
svcInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
DeleteFunc: func(obj interface{}) {
owner, err := c.getOwner(obj)
if err != nil {
return
}
key, err := cache.MetaNamespaceKeyFunc(owner)
if err == nil {
c.Queue.Add(key)
}
},
},
)
return svcInformer
}

func (c *ArgoEventInformerFactory) getOwner(obj interface{}) (interface{}, error) {
m, err := meta.Accessor(obj)
if err != nil {
return nil, err
}
for _, owner := range m.GetOwnerReferences() {

if owner.APIVersion == c.OwnerGroupVersionKind.GroupVersion().String() &&
owner.Kind == c.OwnerGroupVersionKind.Kind {
key := owner.Name
if len(m.GetNamespace()) > 0 {
key = m.GetNamespace() + "/" + key
}
obj, exists, err := c.OwnerInformer.GetIndexer().GetByKey(key)
if err != nil {
return nil, err
}
if !exists {
return nil, fmt.Errorf("failed to get object from cache")
}
return obj, nil
}
}
return nil, fmt.Errorf("no owner found")
}
Loading

0 comments on commit cb8ebd8

Please sign in to comment.