Skip to content

Commit

Permalink
feat: Ability to retry trigger (#1090)
Browse files Browse the repository at this point in the history
* feat: Ability to retry trigger

Signed-off-by: Derek Wang <[email protected]>

* enhance backoff

Signed-off-by: Derek Wang <[email protected]>

* more logs

Signed-off-by: Derek Wang <[email protected]>

* comments change

Signed-off-by: Derek Wang <[email protected]>
  • Loading branch information
whynowy authored Mar 3, 2021
1 parent 35593c8 commit a97dd9a
Show file tree
Hide file tree
Showing 45 changed files with 1,127 additions and 487 deletions.
22 changes: 10 additions & 12 deletions api/openapi-spec/swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions api/sensor.html

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions api/sensor.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

78 changes: 51 additions & 27 deletions common/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,25 @@ package common
import (
"time"

"github.com/pkg/errors"
apierr "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/wait"

apicommon "github.com/argoproj/argo-events/pkg/apis/common"
)

// DefaultRetry is a default retry backoff settings when retrying API calls
var DefaultRetry = wait.Backoff{
Steps: 5,
Duration: 1 * time.Second,
Factor: 1.0,
Jitter: 1,
}
var (
defaultFactor = apicommon.NewAmount("1.0")
defaultJitter = apicommon.NewAmount("1")
defaultDuration = apicommon.FromString("5s")

DefaultBackoff = apicommon.Backoff{
Steps: 5,
Duration: &defaultDuration,
Factor: &defaultFactor,
Jitter: &defaultJitter,
}
)

// IsRetryableKubeAPIError returns if the error is a retryable kubernetes error
func IsRetryableKubeAPIError(err error) bool {
Expand All @@ -42,38 +48,56 @@ func IsRetryableKubeAPIError(err error) bool {
return true
}

// GetConnectionBackoff returns a connection backoff option
func GetConnectionBackoff(backoff *apicommon.Backoff) *wait.Backoff {
result := wait.Backoff{
Duration: DefaultRetry.Duration,
Factor: DefaultRetry.Factor,
Jitter: DefaultRetry.Jitter,
Steps: DefaultRetry.Steps,
// Convert2WaitBackoff converts to a wait backoff option
func Convert2WaitBackoff(backoff *apicommon.Backoff) (*wait.Backoff, error) {
result := wait.Backoff{}

if backoff.Duration != nil {
result.Duration = time.Duration(backoff.Duration.Int64Value())
} else {
result.Duration = time.Duration(defaultDuration.Int64Value())
}
if backoff == nil {
return &result

factor := backoff.Factor
if factor == nil {
factor = &defaultFactor
}
f, err := factor.Float64()
if err != nil {
return nil, errors.Wrap(err, "invalid factor")
}
result.Factor = f

jitter := backoff.Jitter
if jitter == nil {
jitter = &defaultJitter
}
result.Duration = backoff.Duration
result.Factor, _ = backoff.Factor.Float64()
if backoff.Jitter != nil {
result.Jitter, _ = backoff.Jitter.Float64()
j, err := jitter.Float64()
if err != nil {
return nil, errors.Wrap(err, "invalid jitter")
}
if backoff.Steps != 0 {
result.Jitter = j

if backoff.Steps > 0 {
result.Steps = backoff.GetSteps()
} else {
result.Steps = int(DefaultBackoff.Steps)
}
return &result
return &result, nil
}

// Connect is a general connection helper
func Connect(backoff *wait.Backoff, conn func() error) error {
func Connect(backoff *apicommon.Backoff, conn func() error) error {
if backoff == nil {
backoff = &DefaultRetry
backoff = &DefaultBackoff
}
b, err := Convert2WaitBackoff(backoff)
if err != nil {
return errors.Wrap(err, "invalid backoff configuration")
}
err := wait.ExponentialBackoff(*backoff, func() (bool, error) {
return wait.ExponentialBackoff(*b, func() (bool, error) {
if err := conn(); err != nil {
return false, nil
}
return true, nil
})
return err
}
3 changes: 2 additions & 1 deletion eventbus/driver/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,9 @@ func (n *natsStreaming) processEventSourceMsg(m *stan.Msg, msgHolder *eventSourc
msgHolder.msgs[depName] = existingMsg
return
} else if m.Timestamp < existingMsg.timestamp {
// Redelivered old message, ack and return
// Re-delivered old message, ack and return
msgHolder.ackAndCache(m, event.ID())
log.Infow("Dropping this message becasue later ones also satisfy", "message", m)
return
}
}
Expand Down
23 changes: 13 additions & 10 deletions eventsources/eventing.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/google/uuid"
"github.com/pkg/errors"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/util/wait"

"github.com/argoproj/argo-events/common"
"github.com/argoproj/argo-events/common/logging"
Expand Down Expand Up @@ -273,7 +272,7 @@ func (e *EventSourceAdaptor) Start(ctx context.Context) error {
logger.Error("failed to get eventbus driver", zap.Error(err))
return err
}
if err = common.Connect(&common.DefaultRetry, func() error {
if err = common.Connect(&common.DefaultBackoff, func() error {
e.eventBusConn, err = driver.Connect()
return err
}); err != nil {
Expand Down Expand Up @@ -333,12 +332,16 @@ func (e *EventSourceAdaptor) Start(ctx context.Context) error {
e.metrics.IncRunningServices(s.GetEventSourceName())
defer e.metrics.DecRunningServices(s.GetEventSourceName())
defer wg.Done()
if err = common.Connect(&wait.Backoff{
duration := apicommon.FromString("1s")
factor := apicommon.NewAmount("1")
jitter := apicommon.NewAmount("30")
backoff := apicommon.Backoff{
Steps: 10,
Duration: 1 * time.Second,
Factor: 1,
Jitter: 30,
}, func() error {
Duration: &duration,
Factor: &factor,
Jitter: &jitter,
}
if err = common.Connect(&backoff, func() error {
return s.StartListening(cctx, func(data []byte) error {
event := cloudevents.NewEvent()
event.SetID(fmt.Sprintf("%x", uuid.New()))
Expand All @@ -358,13 +361,13 @@ func (e *EventSourceAdaptor) Start(ctx context.Context) error {
return errors.New("failed to publish event, eventbus connection closed")
}
if err = driver.Publish(e.eventBusConn, eventBody); err != nil {
logger.Error("failed to publish an event", zap.Error(err), zap.Any(logging.LabelEventName,
logger.Error("failed to publish an event", zap.Error(err), zap.String(logging.LabelEventName,
s.GetEventName()), zap.Any(logging.LabelEventSourceType, s.GetEventSourceType()))
e.metrics.EventSentFailed(s.GetEventSourceName(), s.GetEventName())
return err
}
logger.Info("succeeded to publish an event", zap.Error(err), zap.Any(logging.LabelEventName,
s.GetEventName()), zap.Any(logging.LabelEventSourceType, s.GetEventSourceType()))
logger.Info("succeeded to publish an event", zap.Error(err), zap.String(logging.LabelEventName,
s.GetEventName()), zap.Any(logging.LabelEventSourceType, s.GetEventSourceType()), zap.String("eventID", event.ID()))
e.metrics.EventSent(s.GetEventSourceName(), s.GetEventName())
return nil
})
Expand Down
2 changes: 1 addition & 1 deletion eventsources/persist/event_persist.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (cmp *ConfigMapPersist) Save(event *Event) error {
return errors.Errorf("event object is nil")
}
//Using Connect util func for backoff retry if K8s API returns error
err := common.Connect(&common.DefaultRetry, func() error {
err := common.Connect(&common.DefaultBackoff, func() error {
cm, err := cmp.kubeClient.CoreV1().ConfigMaps(cmp.namespace).Get(cmp.ctx, cmp.name, metav1.GetOptions{})
if err != nil {
if apierr.IsNotFound(err) && cmp.createIfNotExist {
Expand Down
3 changes: 1 addition & 2 deletions eventsources/sources/amqp/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,8 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt
defer sources.Recover(el.GetEventName())

amqpEventSource := &el.AMQPEventSource
backoff := common.GetConnectionBackoff(amqpEventSource.ConnectionBackoff)
var conn *amqplib.Connection
if err := common.Connect(backoff, func() error {
if err := common.Connect(amqpEventSource.ConnectionBackoff, func() error {
if amqpEventSource.TLS != nil {
tlsConfig, err := common.GetTLSConfig(amqpEventSource.TLS)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion eventsources/sources/emitter/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt
log.Info("creating a client", zap.Any("channelName", emitterEventSource.ChannelName))
client := emitter.NewClient(options...)

if err := common.Connect(common.GetConnectionBackoff(emitterEventSource.ConnectionBackoff), func() error {
if err := common.Connect(emitterEventSource.ConnectionBackoff, func() error {
if err := client.Connect(); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion eventsources/sources/kafka/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (el *EventListener) partitionConsumer(ctx context.Context, log *zap.Sugared
var consumer sarama.Consumer

log.Info("connecting to Kafka cluster...")
if err := common.Connect(common.GetConnectionBackoff(kafkaEventSource.ConnectionBackoff), func() error {
if err := common.Connect(kafkaEventSource.ConnectionBackoff, func() error {
var err error

config, err := getSaramaConfig(kafkaEventSource, log)
Expand Down
2 changes: 1 addition & 1 deletion eventsources/sources/mqtt/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt
var client mqttlib.Client

log.Info("connecting to mqtt broker...")
if err := common.Connect(common.GetConnectionBackoff(mqttEventSource.ConnectionBackoff), func() error {
if err := common.Connect(mqttEventSource.ConnectionBackoff, func() error {
client = mqttlib.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
return token.Error()
Expand Down
2 changes: 1 addition & 1 deletion eventsources/sources/nats/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt

var conn *natslib.Conn
log.Info("connecting to nats cluster...")
if err := common.Connect(common.GetConnectionBackoff(natsEventSource.ConnectionBackoff), func() error {
if err := common.Connect(natsEventSource.ConnectionBackoff, func() error {
var err error
if conn, err = natslib.Connect(natsEventSource.URL, opt...); err != nil {
return err
Expand Down
5 changes: 2 additions & 3 deletions eventsources/sources/nsq/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt
config.TlsV1 = true
}

if err := common.Connect(common.GetConnectionBackoff(nsqEventSource.ConnectionBackoff), func() error {
if err := common.Connect(nsqEventSource.ConnectionBackoff, func() error {
var err error
if consumer, err = nsq.NewConsumer(nsqEventSource.Topic, nsqEventSource.Channel, config); err != nil {
return err
Expand All @@ -100,8 +100,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt

consumer.AddHandler(&messageHandler{dispatch: dispatch, logger: log, isJSON: nsqEventSource.JSONBody, metadata: nsqEventSource.Metadata})

err := consumer.ConnectToNSQLookupd(nsqEventSource.HostAddress)
if err != nil {
if err := consumer.ConnectToNSQLookupd(nsqEventSource.HostAddress); err != nil {
return errors.Wrapf(err, "lookup failed for host %s for event source %s", nsqEventSource.HostAddress, el.GetEventName())
}

Expand Down
2 changes: 1 addition & 1 deletion eventsources/sources/pulsar/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func (el *EventListener) StartListening(ctx context.Context, dispatch func([]byt

var client pulsar.Client

if err := common.Connect(common.GetConnectionBackoff(pulsarEventSource.ConnectionBackoff), func() error {
if err := common.Connect(pulsarEventSource.ConnectionBackoff, func() error {
var err error
if client, err = pulsar.NewClient(clientOpt); err != nil {
return err
Expand Down
8 changes: 4 additions & 4 deletions examples/event-sources/amqp.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ spec:
# optional backoff time for connection retries.
# if not provided, default connection backoff time will be used.
connectionBackoff:
# duration in nanoseconds. following value is 10 seconds
duration: 10000000000
# duration in nanoseconds, or strings like "1s", "1m". following value is 10 seconds
duration: 10s
# how many backoffs
steps: 5
# factor to increase on each step.
Expand Down Expand Up @@ -71,8 +71,8 @@ spec:
# # optional backoff time for connection retries.
# # if not provided, default connection backoff time will be used.
# connectionBackoff:
# # duration in nanoseconds. following value is 10 seconds
# duration: 10000000000
# # duration in nanoseconds, or strings like "1s". following value is 10 seconds
# duration: 10s
# # how many backoffs
# steps: 5
# # factor to increase on each step.
Expand Down
4 changes: 2 additions & 2 deletions examples/event-sources/emitter.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ spec:
# optional backoff time for connection retries.
# if not provided, default connection backoff time will be used.
connectionBackoff:
# duration in nanoseconds. following value is 10 seconds.
duration: 10000000000
# duration in nanoseconds, or strings like "4s", "1m". following value is 10 seconds.
duration: 10s
# how many times you want to apply backoff.
steps: 5
# factor to increase on each step.
Expand Down
Loading

0 comments on commit a97dd9a

Please sign in to comment.