diff --git a/eventbus/driver/nats.go b/eventbus/driver/nats.go index 8f905ab4ac..33687ff9e6 100644 --- a/eventbus/driver/nats.go +++ b/eventbus/driver/nats.go @@ -292,6 +292,9 @@ func (n *natsStreaming) processEventSourceMsg(m *stan.Msg, msgHolder *eventSourc // New message, set and check msgHolder.msgs[depName] = &eventSourceMessage{seq: m.Sequence, timestamp: m.Timestamp, event: event, lastDeliveredTime: now} msgHolder.parameters[depName] = true + if msgHolder.latestGoodMsgTimestamp < m.Timestamp { + msgHolder.latestGoodMsgTimestamp = m.Timestamp + } // Check if there's any stale message being held. // Stale message could be message age has been longer than NATS streaming max message age,