diff --git a/eventbus/driver/nats.go b/eventbus/driver/nats.go index bd936f2965..664a37bed4 100644 --- a/eventbus/driver/nats.go +++ b/eventbus/driver/nats.go @@ -258,9 +258,7 @@ func (n *natsStreaming) processEventSourceMsg(m *stan.Msg, msgHolder *eventSourc return } - // Clean up old messages before starting a new round - - // ACK all the old messages after conditions are met + // Acknowledge any old messages that occurred either before the last reset if m.Timestamp/1e9 <= msgHolder.getLastResetTime() { log.Debugf("About to reset and ack dependency=%s due to message time occurred before reset, m.Timestamp=%d, msgHolder.getLastResetTime()=%d", depName, m.Timestamp, msgHolder.getLastResetTime())