From 507e2fce83d37c6da462b64583b3477b9736945c Mon Sep 17 00:00:00 2001 From: Derek Wang Date: Wed, 12 Jan 2022 16:49:55 -0800 Subject: [PATCH] fix: conditions reset honors the latest timestamp of all unacked msgs Signed-off-by: Derek Wang --- eventbus/driver/nats.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/eventbus/driver/nats.go b/eventbus/driver/nats.go index ab05605c62..07af83899a 100644 --- a/eventbus/driver/nats.go +++ b/eventbus/driver/nats.go @@ -294,7 +294,9 @@ func (n *natsStreaming) processEventSourceMsg(m *stan.Msg, msgHolder *eventSourc // New message, set and check msgHolder.msgs[depName] = &eventSourceMessage{seq: m.Sequence, timestamp: m.Timestamp, event: event, lastDeliveredTime: now} msgHolder.parameters[depName] = true - msgHolder.latestGoodMsgTimestamp = m.Timestamp + if msgHolder.latestGoodMsgTimestamp < m.Timestamp { + msgHolder.latestGoodMsgTimestamp = m.Timestamp + } // Check if there's any stale message being held. // Stale message could be message age has been longer than NATS streaming max message age,