Skip to content

Commit

Permalink
fix duplicate when using self-service (#227)
Browse files Browse the repository at this point in the history
* fix duplicate when using self-service

Signed-off-by: May Zhang <[email protected]>

* fix duplicate when using self-service

Signed-off-by: May Zhang <[email protected]>

* fix tests

Signed-off-by: May Zhang <[email protected]>

* -s

Signed-off-by: May Zhang <[email protected]>

* add test cases

Signed-off-by: May Zhang <[email protected]>

* remove not used method

Signed-off-by: May Zhang <[email protected]>

---------

Signed-off-by: May Zhang <[email protected]>
  • Loading branch information
mayzhang2000 authored Oct 11, 2023
1 parent c7e26ba commit 2d2d1a7
Show file tree
Hide file tree
Showing 7 changed files with 193 additions and 20 deletions.
1 change: 1 addition & 0 deletions pkg/api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Config struct {
// ServiceDefaultTriggers holds list of default triggers per service
ServiceDefaultTriggers map[string][]string
Namespace string
IsSelfServiceConfig bool
}

// Returns list of destinations for the specified trigger
Expand Down
4 changes: 4 additions & 0 deletions pkg/api/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,10 @@ func (f *apiFactory) getApiFromConfigmapAndSecret(cm *v1.ConfigMap, secret *v1.S
if err != nil {
return nil, err
}

if cm.Namespace != f.Settings.DefaultNamespace {
cfg.IsSelfServiceConfig = true
}
getVars, err := f.InitGetVars(cfg, cm, secret)
if err != nil {
return nil, err
Expand Down
29 changes: 26 additions & 3 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,11 @@ func (c *notificationController) Run(threadiness int, stopCh <-chan struct{}) {
log.Warn("Controller has stopped.")
}

// check if an api is a self-service API
func (c *notificationController) isSelfServiceConfigureApi(api api.API) bool {
return c.namespaceSupport && api.GetConfig().IsSelfServiceConfig
}

func (c *notificationController) processResourceWithAPI(api api.API, resource v1.Object, logEntry *log.Entry, eventSequence *NotificationEventSequence) (map[string]string, error) {
apiNamespace := api.GetConfig().Namespace
notificationsState := NewStateFromRes(resource)
Expand Down Expand Up @@ -209,13 +214,13 @@ func (c *notificationController) processResourceWithAPI(api api.API, resource v1

if !cr.Triggered {
for _, to := range destinations {
notificationsState.SetAlreadyNotified(trigger, cr, to, false)
notificationsState.SetAlreadyNotified(c.isSelfServiceConfigureApi(api), apiNamespace, trigger, cr, to, false)
}
continue
}

for _, to := range destinations {
if changed := notificationsState.SetAlreadyNotified(trigger, cr, to, true); !changed {
if changed := notificationsState.SetAlreadyNotified(c.isSelfServiceConfigureApi(api), apiNamespace, trigger, cr, to, true); !changed {
logEntry.Infof("Notification about condition '%s.%s' already sent to '%v' using the configuration in namespace %s", trigger, cr.Key, to, apiNamespace)
eventSequence.addDelivered(NotificationDelivery{
Trigger: trigger,
Expand All @@ -227,7 +232,7 @@ func (c *notificationController) processResourceWithAPI(api api.API, resource v1
if err := api.Send(un.Object, cr.Templates, to); err != nil {
logEntry.Errorf("Failed to notify recipient %s defined in resource %s/%s: %v using the configuration in namespace %s",
to, resource.GetNamespace(), resource.GetName(), err, apiNamespace)
notificationsState.SetAlreadyNotified(trigger, cr, to, false)
notificationsState.SetAlreadyNotified(c.isSelfServiceConfigureApi(api), apiNamespace, trigger, cr, to, false)
c.metricsRegistry.IncDeliveriesCounter(trigger, to.Service, false)
eventSequence.addError(fmt.Errorf("failed to deliver notification %s to %s: %v using the configuration in namespace %s", trigger, to, err, apiNamespace))
} else {
Expand Down Expand Up @@ -320,6 +325,24 @@ func (c *notificationController) processQueueItem() (processNext bool) {
}
for _, api := range apisWithNamespace {
c.processResource(api, resource, logEntry, &eventSequence)

//refresh
obj, exists, err := c.informer.GetIndexer().GetByKey(key.(string))
if err != nil {
log.Errorf("Failed to get resource '%s' from informer index: %+v", key, err)
eventSequence.addError(err)
return
}
if !exists {
// This happens after resource was deleted, but the work queue still had an entry for it.
return
}
resource, ok = obj.(v1.Object)
if !ok {
log.Errorf("Failed to get resource '%s' from informer index: %+v", key, err)
eventSequence.addError(err)
return
}
}
}
logEntry.Info("Processing completed")
Expand Down
148 changes: 142 additions & 6 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
kubetesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"

"github.com/argoproj/notifications-engine/pkg/api"
notificationApi "github.com/argoproj/notifications-engine/pkg/api"
"github.com/argoproj/notifications-engine/pkg/mocks"
"github.com/argoproj/notifications-engine/pkg/services"
"github.com/argoproj/notifications-engine/pkg/subscriptions"
Expand Down Expand Up @@ -72,7 +72,6 @@ func newController(t *testing.T, ctx context.Context, client dynamic.Interface,
mockCtrl.Finish()
}()
mockAPI := mocks.NewMockAPI(mockCtrl)
mockAPI.EXPECT().GetConfig().Return(api.Config{}).AnyTimes()
resourceClient := client.Resource(testGVR)
informer := cache.NewSharedIndexInformer(
&cache.ListWatch{
Expand All @@ -98,6 +97,45 @@ func newController(t *testing.T, ctx context.Context, client dynamic.Interface,
return c, mockAPI, nil
}

func newControllerWithNamespaceSupport(t *testing.T, ctx context.Context, client dynamic.Interface, opts ...Opts) (*notificationController, map[string]notificationApi.API, error) {
mockCtrl := gomock.NewController(t)
go func() {
<-ctx.Done()
mockCtrl.Finish()
}()

resourceClient := client.Resource(testGVR)
informer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options v1.ListOptions) (object runtime.Object, err error) {
return resourceClient.List(context.Background(), options)
},
WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
return resourceClient.Watch(context.Background(), options)
},
},
&unstructured.Unstructured{},
time.Minute,
cache.Indexers{},
)

go informer.Run(ctx.Done())

apiMap := make(map[string]notificationApi.API)
mockAPIDefault := mocks.NewMockAPI(mockCtrl)
apiMap["default"] = mockAPIDefault

mockAPISelfService := mocks.NewMockAPI(mockCtrl)
apiMap["selfservice_namespace"] = mockAPISelfService

c := NewControllerWithNamespaceSupport(resourceClient, informer, &mocks.FakeFactory{ApiMap: apiMap}, opts...)
if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) {
return nil, nil, errors.New("failed to sync informers")
}

return c, apiMap, nil
}

func TestSendsNotificationIfTriggered(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
Expand All @@ -109,6 +147,7 @@ func TestSendsNotificationIfTriggered(t *testing.T) {
assert.NoError(t, err)

receivedObj := map[string]interface{}{}
api.EXPECT().GetConfig().Return(notificationApi.Config{}).AnyTimes()
api.EXPECT().RunTrigger("my-trigger", gomock.Any()).Return([]triggers.ConditionResult{{Triggered: true, Templates: []string{"test"}}}, nil)
api.EXPECT().Send(mock.MatchedBy(func(obj map[string]interface{}) bool {
receivedObj = obj
Expand All @@ -123,22 +162,23 @@ func TestSendsNotificationIfTriggered(t *testing.T) {
assert.NoError(t, err)

state := NewState(annotations[notifiedAnnotationKey])
assert.NotNil(t, state[StateItemKey("mock", triggers.ConditionResult{}, services.Destination{Service: "mock", Recipient: "recipient"})])
assert.NotNil(t, state[StateItemKey(false, "", "mock", triggers.ConditionResult{}, services.Destination{Service: "mock", Recipient: "recipient"})])
assert.Equal(t, app.Object, receivedObj)
}

func TestDoesNotSendNotificationIfAnnotationPresent(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
state := NotificationsState{}
_ = state.SetAlreadyNotified("my-trigger", triggers.ConditionResult{}, services.Destination{Service: "mock", Recipient: "recipient"}, true)
_ = state.SetAlreadyNotified(false, "", "my-trigger", triggers.ConditionResult{}, services.Destination{Service: "mock", Recipient: "recipient"}, true)
app := newResource("test", withAnnotations(map[string]string{
subscriptions.SubscribeAnnotationKey("my-trigger", "mock"): "recipient",
notifiedAnnotationKey: mustToJson(state),
}))
ctrl, api, err := newController(t, ctx, newFakeClient(app))
assert.NoError(t, err)

api.EXPECT().GetConfig().Return(notificationApi.Config{}).AnyTimes()
api.EXPECT().RunTrigger("my-trigger", gomock.Any()).Return([]triggers.ConditionResult{{Triggered: true, Templates: []string{"test"}}}, nil)

_, err = ctrl.processResourceWithAPI(api, app, logEntry, &NotificationEventSequence{})
Expand All @@ -153,14 +193,15 @@ func TestRemovesAnnotationIfNoTrigger(t *testing.T) {
defer cancel()

state := NotificationsState{}
_ = state.SetAlreadyNotified("my-trigger", triggers.ConditionResult{}, services.Destination{Service: "mock", Recipient: "recipient"}, true)
_ = state.SetAlreadyNotified(false, "", "my-trigger", triggers.ConditionResult{}, services.Destination{Service: "mock", Recipient: "recipient"}, true)
app := newResource("test", withAnnotations(map[string]string{
subscriptions.SubscribeAnnotationKey("my-trigger", "mock"): "recipient",
notifiedAnnotationKey: mustToJson(state),
}))
ctrl, api, err := newController(t, ctx, newFakeClient(app))
assert.NoError(t, err)

api.EXPECT().GetConfig().Return(notificationApi.Config{}).AnyTimes()
api.EXPECT().RunTrigger("my-trigger", gomock.Any()).Return([]triggers.ConditionResult{{Triggered: false}}, nil)

annotations, err := ctrl.processResourceWithAPI(api, app, logEntry, &NotificationEventSequence{})
Expand All @@ -177,7 +218,7 @@ func TestUpdatedAnnotationsSavedAsPatch(t *testing.T) {
defer cancel()

state := NotificationsState{}
_ = state.SetAlreadyNotified("my-trigger", triggers.ConditionResult{}, services.Destination{Service: "mock", Recipient: "recipient"}, true)
_ = state.SetAlreadyNotified(false, "", "my-trigger", triggers.ConditionResult{}, services.Destination{Service: "mock", Recipient: "recipient"}, true)

app := newResource("test", withAnnotations(map[string]string{
subscriptions.SubscribeAnnotationKey("my-trigger", "mock"): "recipient",
Expand All @@ -193,6 +234,7 @@ func TestUpdatedAnnotationsSavedAsPatch(t *testing.T) {
})
ctrl, api, err := newController(t, ctx, client)
assert.NoError(t, err)
api.EXPECT().GetConfig().Return(notificationApi.Config{}).AnyTimes()
api.EXPECT().RunTrigger("my-trigger", gomock.Any()).Return([]triggers.ConditionResult{{Triggered: false}}, nil)

go ctrl.Run(1, ctx.Done())
Expand Down Expand Up @@ -324,6 +366,7 @@ func TestWithEventCallback(t *testing.T) {

ctrl, api, err := newController(t, ctx, newFakeClient(app), WithEventCallback(mockEventCallback))
ctrl.namespaceSupport = false
api.EXPECT().GetConfig().Return(notificationApi.Config{}).AnyTimes()
assert.NoError(t, err)
ctrl.apiFactory = &mocks.FakeFactory{Api: api, Err: tc.apiErr}

Expand All @@ -349,3 +392,96 @@ func TestWithEventCallback(t *testing.T) {
})
}
}

// verify annotations after calling processResourceWithAPI when using self-service
func TestProcessResourceWithAPIWithSelfService(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
app := newResource("test", withAnnotations(map[string]string{
subscriptions.SubscribeAnnotationKey("my-trigger", "mock"): "recipient",
}))

ctrl, api, err := newController(t, ctx, newFakeClient(app))
assert.NoError(t, err)
ctrl.namespaceSupport = true

trigger := "my-trigger"
namespace := "my-namespace"

receivedObj := map[string]interface{}{}

//SelfService API: config has IsSelfServiceConfig set to true
api.EXPECT().GetConfig().Return(notificationApi.Config{IsSelfServiceConfig: true, Namespace: namespace}).AnyTimes()
api.EXPECT().RunTrigger(trigger, gomock.Any()).Return([]triggers.ConditionResult{{Triggered: true, Templates: []string{"test"}}}, nil)
api.EXPECT().Send(mock.MatchedBy(func(obj map[string]interface{}) bool {
receivedObj = obj
return true
}), []string{"test"}, services.Destination{Service: "mock", Recipient: "recipient"}).Return(nil)

annotations, err := ctrl.processResourceWithAPI(api, app, logEntry, &NotificationEventSequence{})
if err != nil {
logEntry.Errorf("Failed to process: %v", err)
}

assert.NoError(t, err)

state := NewState(annotations[notifiedAnnotationKey])
assert.NotZero(t, state[StateItemKey(true, namespace, trigger, triggers.ConditionResult{}, services.Destination{Service: "mock", Recipient: "recipient"})])
assert.Equal(t, app.Object, receivedObj)
}

// verify notification sent to both default and self-service configuration after calling processResourceWithAPI when using self-service
func TestProcessItemsWithSelfService(t *testing.T) {
const triggerName = "my-trigger"
destination := services.Destination{Service: "mock", Recipient: "recipient"}

var actualSequence *NotificationEventSequence
mockEventCallback := func(eventSequence NotificationEventSequence) {
actualSequence = &eventSequence
}

ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
app := newResource("test", withAnnotations(map[string]string{
subscriptions.SubscribeAnnotationKey("my-trigger", "mock"): "recipient",
}))

ctrl, apiMap, err := newControllerWithNamespaceSupport(t, ctx, newFakeClient(app), WithEventCallback(mockEventCallback))
assert.NoError(t, err)

ctrl.namespaceSupport = true
//SelfService API: config has IsSelfServiceConfig set to true
apiMap["selfservice_namespace"].(*mocks.MockAPI).EXPECT().GetConfig().Return(notificationApi.Config{IsSelfServiceConfig: true, Namespace: "selfservice_namespace"}).Times(3)
apiMap["selfservice_namespace"].(*mocks.MockAPI).EXPECT().RunTrigger(triggerName, gomock.Any()).Return([]triggers.ConditionResult{{Triggered: true, Templates: []string{"test"}}}, nil)
apiMap["selfservice_namespace"].(*mocks.MockAPI).EXPECT().Send(mock.MatchedBy(func(obj map[string]interface{}) bool {
return true
}), []string{"test"}, destination).Return(nil).AnyTimes()

apiMap["default"].(*mocks.MockAPI).EXPECT().GetConfig().Return(notificationApi.Config{IsSelfServiceConfig: false, Namespace: "default"}).Times(3)
apiMap["default"].(*mocks.MockAPI).EXPECT().RunTrigger(triggerName, gomock.Any()).Return([]triggers.ConditionResult{{Triggered: true, Templates: []string{"test"}}}, nil)
apiMap["default"].(*mocks.MockAPI).EXPECT().Send(mock.MatchedBy(func(obj map[string]interface{}) bool {
return true
}), []string{"test"}, destination).Return(nil).AnyTimes()

ctrl.apiFactory = &mocks.FakeFactory{ApiMap: apiMap}

ctrl.processQueueItem()

assert.Equal(t, app, actualSequence.Resource)

expectedDeliveries := []NotificationDelivery{
{
Trigger: triggerName,
Destination: destination,
},
{
Trigger: triggerName,
Destination: destination,
},
}
for i, event := range actualSequence.Delivered {
assert.Equal(t, expectedDeliveries[i].Trigger, event.Trigger)
assert.Equal(t, expectedDeliveries[i].Destination, event.Destination)
}

}
13 changes: 9 additions & 4 deletions pkg/controller/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,13 @@ const (
notifiedHistoryMaxSize = 100
)

func StateItemKey(trigger string, conditionResult triggers.ConditionResult, dest services.Destination) string {
key := fmt.Sprintf("%s:%s:%s:%s", trigger, conditionResult.Key, dest.Service, dest.Recipient)
func StateItemKey(isSelfConfig bool, apiNamespace, trigger string, conditionResult triggers.ConditionResult, dest services.Destination) string {
var key string
if isSelfConfig {
key = fmt.Sprintf("%s:%s:%s:%s:%s", apiNamespace, trigger, conditionResult.Key, dest.Service, dest.Recipient)
} else {
key = fmt.Sprintf("%s:%s:%s:%s", trigger, conditionResult.Key, dest.Service, dest.Recipient)
}
if conditionResult.OncePer != "" {
key = conditionResult.OncePer + ":" + key
}
Expand Down Expand Up @@ -47,8 +52,8 @@ func (s NotificationsState) truncate(maxSize int) {
}

// SetAlreadyNotified set the state of given trigger/destination and return if state has been changed
func (s NotificationsState) SetAlreadyNotified(trigger string, result triggers.ConditionResult, dest services.Destination, isNotified bool) bool {
key := StateItemKey(trigger, result, dest)
func (s NotificationsState) SetAlreadyNotified(isSelfConfig bool, apiNamespace, trigger string, result triggers.ConditionResult, dest services.Destination, isNotified bool) bool {
key := StateItemKey(isSelfConfig, apiNamespace, trigger, result, dest)
if _, alreadyNotified := s[key]; alreadyNotified == isNotified {
return false
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/controller/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,16 @@ func TestSetAlreadyNotified(t *testing.T) {
dest := services.Destination{Service: "slack", Recipient: "my-channel"}

state := NotificationsState{}
changed := state.SetAlreadyNotified("app-synced", triggers.ConditionResult{Key: "0"}, dest, true)
changed := state.SetAlreadyNotified(false, "", "app-synced", triggers.ConditionResult{Key: "0"}, dest, true)

assert.True(t, changed)
_, ok := state["app-synced:0:slack:my-channel"]
assert.True(t, ok)

changed = state.SetAlreadyNotified("app-synced", triggers.ConditionResult{Key: "0"}, dest, true)
changed = state.SetAlreadyNotified(false, "", "app-synced", triggers.ConditionResult{Key: "0"}, dest, true)
assert.False(t, changed)

changed = state.SetAlreadyNotified("app-synced", triggers.ConditionResult{Key: "0"}, dest, false)
changed = state.SetAlreadyNotified(false, "", "app-synced", triggers.ConditionResult{Key: "0"}, dest, false)
assert.True(t, changed)
_, ok = state["app-synced:0:slack:my-channel"]
assert.False(t, ok)
Expand All @@ -45,13 +45,13 @@ func TestSetAlreadyNotified_OncePerItem(t *testing.T) {
dest := services.Destination{Service: "slack", Recipient: "my-channel"}

state := NotificationsState{}
changed := state.SetAlreadyNotified("app-synced", triggers.ConditionResult{OncePer: "abc", Key: "0"}, dest, true)
changed := state.SetAlreadyNotified(false, "", "app-synced", triggers.ConditionResult{OncePer: "abc", Key: "0"}, dest, true)

assert.True(t, changed)
_, ok := state["abc:app-synced:0:slack:my-channel"]
assert.True(t, ok)

changed = state.SetAlreadyNotified("app-synced", triggers.ConditionResult{OncePer: "abc", Key: "0"}, dest, false)
changed = state.SetAlreadyNotified(false, "", "app-synced", triggers.ConditionResult{OncePer: "abc", Key: "0"}, dest, false)
assert.False(t, changed)
_, ok = state["abc:app-synced:0:slack:my-channel"]
assert.True(t, ok)
Expand Down
Loading

0 comments on commit 2d2d1a7

Please sign in to comment.