diff --git a/Dockerfile b/Dockerfile index 0c9e811646..3de1abb800 100644 --- a/Dockerfile +++ b/Dockerfile @@ -47,7 +47,7 @@ ENTRYPOINT [ "/bin/eventsource" ] #################################################################################################### # sensor #################################################################################################### -FROM alpine as sensor +FROM alpine:3.12.3 as sensor RUN apk update && apk upgrade && \ apk add --no-cache git diff --git a/eventsources/eventing.go b/eventsources/eventing.go index 608aecbf52..8cc5812eba 100644 --- a/eventsources/eventing.go +++ b/eventsources/eventing.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "math/rand" "strings" "sync" "time" @@ -262,7 +263,8 @@ func (e *EventSourceAdaptor) Start(ctx context.Context) error { logger := logging.FromContext(ctx).Desugar() logger.Info("Starting event source server...") servers := GetEventingServers(e.eventSource) - driver, err := eventbus.GetDriver(ctx, *e.eventBusConfig, e.eventBusSubject, strings.ReplaceAll(e.hostname, ".", "_")) + clientID := generateClientID(e.hostname) + driver, err := eventbus.GetDriver(ctx, *e.eventBusConfig, e.eventBusSubject, clientID) if err != nil { logger.Error("failed to get eventbus driver", zap.Error(err)) return err @@ -293,6 +295,13 @@ func (e *EventSourceAdaptor) Start(ctx context.Context) error { case <-ticker.C: if e.eventBusConn == nil || e.eventBusConn.IsClosed() { logger.Info("NATS connection lost, reconnecting...") + // Regenerate the client ID to avoid the issue that NAT server still thinks the client is alive. + clientID := generateClientID(e.hostname) + driver, err := eventbus.GetDriver(cctx, *e.eventBusConfig, e.eventBusSubject, clientID) + if err != nil { + logger.Error("failed to get eventbus driver during reconnection", zap.Error(err)) + continue + } e.eventBusConn, err = driver.Connect() if err != nil { logger.Error("failed to reconnect to eventbus", zap.Error(err)) @@ -382,3 +391,10 @@ func (e *EventSourceAdaptor) Start(ctx context.Context) error { } } } + +func generateClientID(hostname string) string { + s1 := rand.NewSource(time.Now().UnixNano()) + r1 := rand.New(s1) + clientID := fmt.Sprintf("client-%s-%v", strings.ReplaceAll(hostname, ".", "_"), r1.Intn(1000)) + return clientID +} diff --git a/sensors/listener.go b/sensors/listener.go index 8166b104c7..eb1428c75c 100644 --- a/sensors/listener.go +++ b/sensors/listener.go @@ -118,7 +118,7 @@ func (sensorCtx *SensorContext) ListenEvents(ctx context.Context) error { group, clientID := sensorCtx.getGroupAndClientID(depExpression) ebDriver, err := eventbus.GetDriver(cctx, *sensorCtx.EventBusConfig, sensorCtx.EventBusSubject, clientID) if err != nil { - logger.Error("failed to get event bus driver", zap.Error(err)) + logger.Error("failed to get eventbus driver", zap.Error(err)) return } triggerNames := []string{} @@ -176,7 +176,7 @@ func (sensorCtx *SensorContext) ListenEvents(ctx context.Context) error { err = ebDriver.SubscribeEventSources(cctx, conn, group, closeSubCh, depExpression, deps, filterFunc, actionFunc) if err != nil { - logger.Error("failed to subscribe to event bus", zap.Any("clientID", clientID), zap.Error(err)) + logger.Error("failed to subscribe to eventbus", zap.Any("clientID", clientID), zap.Error(err)) return } }() @@ -196,6 +196,13 @@ func (sensorCtx *SensorContext) ListenEvents(ctx context.Context) error { case <-ticker.C: if conn == nil || conn.IsClosed() { logger.Info("NATS connection lost, reconnecting...") + // Regenerate the client ID to avoid the issue that NAT server still thinks the client is alive. + _, clientID := sensorCtx.getGroupAndClientID(depExpression) + ebDriver, err := eventbus.GetDriver(cctx, *sensorCtx.EventBusConfig, sensorCtx.EventBusSubject, clientID) + if err != nil { + logger.Error("failed to get eventbus driver during reconnection", zap.Error(err)) + continue + } conn, err = ebDriver.Connect() if err != nil { logger.Error("failed to reconnect to eventbus", zap.Any("clientID", clientID), zap.Error(err))