Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding retry loop to update #1441

Merged
merged 13 commits into from
Sep 13, 2021
4 changes: 4 additions & 0 deletions CHANGELOG.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
|===
| | Description | PR

| ✨
| Reuse conflict retry loop from client-go/util/retry
| https://github.com/knative/client/pull/1441[#1441]

| ✨
| Deprecate `lookup-path` as path lookup will always be enabled in the future
| https://github.com/knative/client/pull/1422[#1422]
Expand Down
35 changes: 35 additions & 0 deletions pkg/eventing/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"fmt"
"time"

"k8s.io/client-go/util/retry"

apis_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -33,6 +35,8 @@ import (
"knative.dev/client/pkg/wait"
)

type TriggerUpdateFunc func(origSource *eventingv1.Trigger) (*eventingv1.Trigger, error)

// KnEventingClient to Eventing Sources. All methods are relative to the
// namespace specified during construction
type KnEventingClient interface {
Expand All @@ -48,6 +52,8 @@ type KnEventingClient interface {
ListTriggers(ctx context.Context) (*eventingv1.TriggerList, error)
// UpdateTrigger is used to update an instance of trigger
UpdateTrigger(ctx context.Context, trigger *eventingv1.Trigger) error
// UpdateTriggerWithRetry is used to update an instance of trigger
UpdateTriggerWithRetry(ctx context.Context, name string, updateFunc TriggerUpdateFunc, nrRetries int) error
// CreateBroker is used to create an instance of broker
CreateBroker(ctx context.Context, broker *eventingv1.Broker) error
// GetBroker is used to get an instance of broker
Expand Down Expand Up @@ -137,6 +143,35 @@ func (c *knEventingClient) UpdateTrigger(ctx context.Context, trigger *eventingv
return nil
}

func (c *knEventingClient) UpdateTriggerWithRetry(ctx context.Context, name string, updateFunc TriggerUpdateFunc, nrRetries int) error {
return updateTriggerWithRetry(ctx, c, name, updateFunc, nrRetries)
}

func updateTriggerWithRetry(ctx context.Context, c KnEventingClient, name string, updateFunc TriggerUpdateFunc, nrRetries int) error {
b := retry.DefaultRetry
b.Steps = nrRetries
err := retry.RetryOnConflict(b, func() error {
vyasgun marked this conversation as resolved.
Show resolved Hide resolved
return updateTrigger(ctx, c, name, updateFunc)
})
rhuss marked this conversation as resolved.
Show resolved Hide resolved
return err
}

func updateTrigger(ctx context.Context, c KnEventingClient, name string, updateFunc TriggerUpdateFunc) error {
trigger, err := c.GetTrigger(ctx, name)
if err != nil {
return err
}
if trigger.GetDeletionTimestamp() != nil {
return fmt.Errorf("can't update trigger %s because it has been marked for deletion", name)
}
updatedSource, err := updateFunc(trigger.DeepCopy())
vyasgun marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}

return c.UpdateTrigger(ctx, updatedSource)
}

// Return the client's namespace
func (c *knEventingClient) Namespace() string {
return c.namespace
Expand Down
4 changes: 4 additions & 0 deletions pkg/eventing/v1/client_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,10 @@ func (c *MockKnEventingClient) UpdateTrigger(ctx context.Context, trigger *event
return mock.ErrorOrNil(call.Result[0])
}

func (c *MockKnEventingClient) UpdateTriggerWithRetry(ctx context.Context, name string, updateFunc TriggerUpdateFunc, nrRetries int) error {
vyasgun marked this conversation as resolved.
Show resolved Hide resolved
return updateTriggerWithRetry(ctx, c, name, updateFunc, nrRetries)
}

// CreateBroker records a call for CreateBroker with the expected error
func (sr *EventingRecorder) CreateBroker(broker interface{}, err error) {
sr.r.Add("CreateBroker", []interface{}{broker}, []interface{}{err})
Expand Down
5 changes: 5 additions & 0 deletions pkg/eventing/v1/client_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ func TestMockKnClient(t *testing.T) {
recorder.DeleteTrigger("hello", nil)
recorder.ListTriggers(nil, nil)
recorder.UpdateTrigger(&eventingv1.Trigger{}, nil)
recorder.GetTrigger("hello", &eventingv1.Trigger{}, nil)
recorder.UpdateTrigger(&eventingv1.Trigger{}, nil)

recorder.CreateBroker(&eventingv1.Broker{}, nil)
recorder.GetBroker("foo", nil, nil)
Expand All @@ -47,6 +49,9 @@ func TestMockKnClient(t *testing.T) {
client.DeleteTrigger(ctx, "hello")
client.ListTriggers(ctx)
client.UpdateTrigger(ctx, &eventingv1.Trigger{})
client.UpdateTriggerWithRetry(ctx, "hello", func(origSource *eventingv1.Trigger) (*eventingv1.Trigger, error) {
return origSource, nil
vyasgun marked this conversation as resolved.
Show resolved Hide resolved
}, 10)

client.CreateBroker(ctx, &eventingv1.Broker{})
client.GetBroker(ctx, "foo")
Expand Down
118 changes: 118 additions & 0 deletions pkg/eventing/v1/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,124 @@ func TestListTrigger(t *testing.T) {
})
}

func TestUpdateTrigger(t *testing.T) {
serving, client := setup()
t.Run("update trigger will update the trigger",
func(t *testing.T) {
trigger := newTrigger("trigger-1")
errTrigger := newTrigger("errorTrigger")
serving.AddReactor("update", "triggers",
func(a client_testing.Action) (bool, runtime.Object, error) {
newSource := a.(client_testing.UpdateAction).GetObject()
vyasgun marked this conversation as resolved.
Show resolved Hide resolved
name := newSource.(metav1.Object).GetName()
if name == "errorTrigger" {
return true, nil, errors.NewInternalError(fmt.Errorf("mock internal error"))
}
return true, trigger, nil
})
err := client.UpdateTrigger(context.Background(), trigger)
assert.NilError(t, err)
err = client.UpdateTrigger(context.Background(), errTrigger)
assert.ErrorType(t, err, errors.IsInternalError)
})
}

func TestUpdateTriggerWithRetry(t *testing.T) {
serving, client := setup()
var attemptCount, maxAttempts = 0, 5
serving.AddReactor("get", "triggers",
func(a client_testing.Action) (bool, runtime.Object, error) {
name := a.(client_testing.GetAction).GetName()
if name == "deletedTrigger" {
trigger := newTrigger(name)
now := metav1.Now()
trigger.DeletionTimestamp = &now
return true, trigger, nil
}
if name == "getErrorTrigger" {
return true, nil, errors.NewInternalError(fmt.Errorf("mock internal error"))
}
return true, newTrigger(name), nil
})

serving.AddReactor("update", "triggers",
func(a client_testing.Action) (bool, runtime.Object, error) {
newTrigger := a.(client_testing.UpdateAction).GetObject()
name := newTrigger.(metav1.Object).GetName()

if name == "testTrigger" && attemptCount > 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very nice test for checking the retry.

attemptCount--
return true, nil, errors.NewConflict(eventingv1.Resource("trigger"), "errorTrigger", fmt.Errorf("error updating because of conflict"))
}
if name == "errorTrigger" {
return true, nil, errors.NewInternalError(fmt.Errorf("mock internal error"))
}
return true, NewTriggerBuilderFromExisting(newTrigger.(*eventingv1.Trigger)).Build(), nil
})

t.Run("Update trigger successfully without any retries", func(t *testing.T) {
err := client.UpdateTriggerWithRetry(context.Background(), "testTrigger", func(trigger *eventingv1.Trigger) (*eventingv1.Trigger, error) {
return trigger, nil
}, maxAttempts)
assert.NilError(t, err, "No retries required as no conflict error occurred")
})

t.Run("Update trigger with retry after max retries", func(t *testing.T) {
attemptCount = maxAttempts - 1
err := client.UpdateTriggerWithRetry(context.Background(), "testTrigger", func(trigger *eventingv1.Trigger) (*eventingv1.Trigger, error) {
return trigger, nil
}, maxAttempts)
assert.NilError(t, err, "Update retried %d times and succeeded", maxAttempts)
assert.Equal(t, attemptCount, 0)
})

t.Run("Update trigger with retry and fail with conflict after exhausting max retries", func(t *testing.T) {
attemptCount = maxAttempts
err := client.UpdateTriggerWithRetry(context.Background(), "testTrigger", func(trigger *eventingv1.Trigger) (*eventingv1.Trigger, error) {
return trigger, nil
}, maxAttempts)
assert.ErrorType(t, err, errors.IsConflict, "Update retried %d times and failed", maxAttempts)
assert.Equal(t, attemptCount, 0)
})

t.Run("Update trigger with retry and fail with conflict after exhausting max retries", func(t *testing.T) {
attemptCount = maxAttempts
err := client.UpdateTriggerWithRetry(context.Background(), "testTrigger", func(trigger *eventingv1.Trigger) (*eventingv1.Trigger, error) {
return trigger, nil
}, maxAttempts)
assert.ErrorType(t, err, errors.IsConflict, "Update retried %d times and failed", maxAttempts)
assert.Equal(t, attemptCount, 0)
})

t.Run("Update trigger with retry fails with a non conflict error", func(t *testing.T) {
err := client.UpdateTriggerWithRetry(context.Background(), "errorTrigger", func(trigger *eventingv1.Trigger) (*eventingv1.Trigger, error) {
return trigger, nil
}, maxAttempts)
assert.ErrorType(t, err, errors.IsInternalError)
})

t.Run("Update trigger with retry fails with resource already deleted error", func(t *testing.T) {
err := client.UpdateTriggerWithRetry(context.Background(), "deletedTrigger", func(trigger *eventingv1.Trigger) (*eventingv1.Trigger, error) {
return trigger, nil
}, maxAttempts)
assert.ErrorContains(t, err, "marked for deletion")
})

t.Run("Update trigger with retry fails with error from updateFunc", func(t *testing.T) {
err := client.UpdateTriggerWithRetry(context.Background(), "testTrigger", func(trigger *eventingv1.Trigger) (*eventingv1.Trigger, error) {
return trigger, fmt.Errorf("error updating object")
}, maxAttempts)
assert.ErrorContains(t, err, "error updating object")
})

t.Run("Update trigger with retry fails with error from GetTrigger", func(t *testing.T) {
err := client.UpdateTriggerWithRetry(context.Background(), "getErrorTrigger", func(trigger *eventingv1.Trigger) (*eventingv1.Trigger, error) {
return trigger, nil
}, maxAttempts)
assert.ErrorType(t, err, errors.IsInternalError)
})
}

func TestTriggerBuilder(t *testing.T) {
a := NewTriggerBuilder("testtrigger")
a.Filters(map[string]string{"type": "foo"})
Expand Down
36 changes: 19 additions & 17 deletions pkg/kn/commands/domain/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@ import (
"errors"
"fmt"

"knative.dev/serving/pkg/apis/serving/v1alpha1"

"github.com/spf13/cobra"

knerrors "knative.dev/client/pkg/errors"
"knative.dev/client/pkg/kn/commands"
)

const MaxUpdateRetries = 3

// NewDomainMappingUpdateCommand to create event channels
func NewDomainMappingUpdateCommand(p *commands.KnParams) *cobra.Command {
var refFlags RefFlags
Expand All @@ -48,27 +52,25 @@ func NewDomainMappingUpdateCommand(p *commands.KnParams) *cobra.Command {
return err
}

toUpdate, err := client.GetDomainMapping(cmd.Context(), name)
if err != nil {
return err
}

if toUpdate.GetDeletionTimestamp() != nil {
return fmt.Errorf("can't update domain mapping '%s' because it has been marked for deletion", name)
}
updateFunc := func(toUpdate *v1alpha1.DomainMapping) (*v1alpha1.DomainMapping, error) {
if toUpdate.GetDeletionTimestamp() != nil {
return nil, fmt.Errorf("can't update domain mapping '%s' because it has been marked for deletion", name)
}

dynamicClient, err := p.NewDynamicClient(namespace)
if err != nil {
return err
}
dynamicClient, err := p.NewDynamicClient(namespace)
if err != nil {
return nil, err
}

reference, err := refFlags.Resolve(cmd.Context(), dynamicClient, namespace)
if err != nil {
return err
reference, err := refFlags.Resolve(cmd.Context(), dynamicClient, namespace)
if err != nil {
return nil, err
}
toUpdate.Spec.Ref = *reference
return toUpdate, nil
}
toUpdate.Spec.Ref = *reference

err = client.UpdateDomainMapping(cmd.Context(), toUpdate)
err = client.UpdateDomainMappingWithRetry(cmd.Context(), name, updateFunc, MaxUpdateRetries)
if err != nil {
return knerrors.GetError(err)
}
Expand Down
44 changes: 21 additions & 23 deletions pkg/kn/commands/source/container/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@ import (

"knative.dev/client/pkg/kn/commands"
v1 "knative.dev/client/pkg/sources/v1"
sourcesv1 "knative.dev/eventing/pkg/apis/sources/v1"
)

const MaxUpdateRetries = 3

// NewContainerUpdateCommand for managing source update
func NewContainerUpdateCommand(p *commands.KnParams) *cobra.Command {
var podFlags knflags.PodSpecFlags
Expand Down Expand Up @@ -60,35 +63,30 @@ func NewContainerUpdateCommand(p *commands.KnParams) *cobra.Command {
return err
}

source, err := srcClient.GetContainerSource(cmd.Context(), name)
if err != nil {
return err
}
if source.GetDeletionTimestamp() != nil {
return fmt.Errorf("can't update container source %s because it has been marked for deletion", name)
}

b := v1.NewContainerSourceBuilderFromExisting(source)
podSpec := b.Build().Spec.Template.Spec
err = podFlags.ResolvePodSpec(&podSpec, cmd.Flags(), os.Args)
if err != nil {
return fmt.Errorf(
"cannot update ContainerSource '%s' in namespace '%s' "+
"because: %s", name, namespace, err)
}
b.PodSpec(podSpec)

if cmd.Flags().Changed("sink") {
objectRef, err := sinkFlags.ResolveSink(cmd.Context(), dynamicClient, namespace)
updateFunc := func(source *sourcesv1.ContainerSource) (*sourcesv1.ContainerSource, error) {
b := v1.NewContainerSourceBuilderFromExisting(source)
podSpec := b.Build().Spec.Template.Spec
err = podFlags.ResolvePodSpec(&podSpec, cmd.Flags(), os.Args)
if err != nil {
return fmt.Errorf(
return nil, fmt.Errorf(
"cannot update ContainerSource '%s' in namespace '%s' "+
"because: %s", name, namespace, err)
}
b.Sink(*objectRef)
b.PodSpec(podSpec)

if cmd.Flags().Changed("sink") {
objectRef, err := sinkFlags.ResolveSink(cmd.Context(), dynamicClient, namespace)
if err != nil {
return nil, fmt.Errorf(
"cannot update ContainerSource '%s' in namespace '%s' "+
"because: %s", name, namespace, err)
}
b.Sink(*objectRef)
}
return b.Build(), nil
}

err = srcClient.UpdateContainerSource(cmd.Context(), b.Build())
err = srcClient.UpdateContainerSourceWithRetry(cmd.Context(), name, updateFunc, MaxUpdateRetries)
if err == nil {
fmt.Fprintf(cmd.OutOrStdout(), "Container source '%s' updated in namespace '%s'.\n", args[0], namespace)
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/kn/commands/source/ping/ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ import (
sourcesv1beta2 "knative.dev/eventing/pkg/client/clientset/versioned/typed/sources/v1beta2"
)

const (
MaxUpdateRetries = 3
)

// NewPingCommand is the root command for all Ping source related commands
func NewPingCommand(p *commands.KnParams) *cobra.Command {
pingImporterCmd := &cobra.Command{
Expand Down
Loading