diff --git a/eventbus/driver/nats.go b/eventbus/driver/nats.go index ab05605c62..07af83899a 100644 --- a/eventbus/driver/nats.go +++ b/eventbus/driver/nats.go @@ -294,7 +294,9 @@ func (n *natsStreaming) processEventSourceMsg(m *stan.Msg, msgHolder *eventSourc // New message, set and check msgHolder.msgs[depName] = &eventSourceMessage{seq: m.Sequence, timestamp: m.Timestamp, event: event, lastDeliveredTime: now} msgHolder.parameters[depName] = true - msgHolder.latestGoodMsgTimestamp = m.Timestamp + if msgHolder.latestGoodMsgTimestamp < m.Timestamp { + msgHolder.latestGoodMsgTimestamp = m.Timestamp + } // Check if there's any stale message being held. // Stale message could be message age has been longer than NATS streaming max message age,