diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 91404adcf6..a180586215 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -589,6 +589,41 @@ func TestConsumerEventTime(t *testing.T) { assert.Equal(t, "test", string(msg.Payload())) } +func TestConsumerWithoutEventTime(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + topicName := "test-without-event-time" + ctx := context.Background() + + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topicName, + }) + assert.Nil(t, err) + defer producer.Close() + + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topicName, + SubscriptionName: "sub-1", + }) + assert.Nil(t, err) + defer consumer.Close() + + _, err = producer.Send(ctx, &ProducerMessage{ + Payload: []byte("test"), + }) + assert.Nil(t, err) + + msg, err := consumer.Receive(ctx) + assert.Nil(t, err) + assert.Equal(t, int64(0), msg.EventTime().UnixNano()) + assert.Equal(t, "test", string(msg.Payload())) +} + func TestConsumerFlow(t *testing.T) { client, err := NewClient(ClientOptions{ URL: lookupURL, diff --git a/pulsar/message_test.go b/pulsar/message_test.go new file mode 100644 index 0000000000..aa660b9482 --- /dev/null +++ b/pulsar/message_test.go @@ -0,0 +1,30 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsar + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestMessageZeroEventTime(t *testing.T) { + msg := &ProducerMessage{} + assert.Equal(t, false, msg.EventTime.UnixNano() == 0) + assert.Equal(t, true, msg.EventTime.IsZero()) +} diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 210a929ce4..45d2aa9a7e 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -548,7 +548,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) { PayloadSize: proto.Int(len(payload)), } - if msg.EventTime.UnixNano() != 0 { + if !msg.EventTime.IsZero() { smm.EventTime = proto.Uint64(internal.TimestampMillis(msg.EventTime)) }