Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

why consumer redelivered again when msg.DoubleAck with no error #1717

Open
alifpay opened this issue Sep 23, 2024 · 6 comments
Open

why consumer redelivered again when msg.DoubleAck with no error #1717

alifpay opened this issue Sep 23, 2024 · 6 comments
Labels
defect Suspected defect such as a bug or regression

Comments

@alifpay
Copy link

alifpay commented Sep 23, 2024

Observed behavior

streamBatch, err := s.js.NewStream(ctx, jetstream.StreamConfig{
		Name:      "PUSHBATCH",
		Subjects:  []string{"push.batch", "push.batch.retry"},
		MaxAge:    3 * 24 * time.Hour,
		Storage:   jetstream.FileStorage,
		Retention: jetstream.WorkQueuePolicy,
	})
	if err != nil {
		log.Fatalln("failed to create streamBatch: ", err)
	}

consBatch, err := s.js.NewConsumer(ctx, streamBatch, jetstream.ConsumerConfig{
		Name:          "pushbatch",
		Durable:       "pushbatch",
		FilterSubject: "push.batch",
		AckWait:       50 * time.Second,
		AckPolicy:     jetstream.AckExplicitPolicy,
	})
	if err != nil {
		err = fmt.Errorf("failed to create consumer, %w", err)
		return err
	}
	_, err = consBatch.Consume(s.processBatch,
		jetstream.PullMaxMessages(1),
		jetstream.ConsumeErrHandler(func(consumeCtx jetstream.ConsumeContext, err error) {
			fmt.Println(err)
		}))
	if err != nil {
		err = fmt.Errorf("failed to consume, %w", err)
		return err
	}
	_, err = consBatch.Consume(s.processBatch,
		jetstream.ConsumeErrHandler(func(consumeCtx jetstream.ConsumeContext, err error) {
			fmt.Println(err)
		}))
	if err != nil {
		err = fmt.Errorf("failed to consume, %w", err)
		return err
	}
	_, err = consBatch.Consume(s.processBatch,
		jetstream.ConsumeErrHandler(func(consumeCtx jetstream.ConsumeContext, err error) {
			fmt.Println(err)
		}))
	if err != nil {
		err = fmt.Errorf("failed to consume, %w", err)
		return err
	}
	_, err = consBatch.Consume(s.processBatch,
		jetstream.ConsumeErrHandler(func(consumeCtx jetstream.ConsumeContext, err error) {
			fmt.Println(err)
		}))
	if err != nil {
		err = fmt.Errorf("failed to consume, %w", err)
		return err
	}
	_, err = consBatch.Consume(s.processBatch,
		jetstream.ConsumeErrHandler(func(consumeCtx jetstream.ConsumeContext, err error) {
			fmt.Println(err)
		}))
	if err != nil {
		err = fmt.Errorf("failed to consume, %w", err)
		return err
	}

Expected behavior

I need only one time delivering

Server and client version

github.com/nats-io/nats.go v1.37.0

server:Version: 2.10.20

Host environment

in docker

Steps to reproduce

No response

@alifpay alifpay added the defect Suspected defect such as a bug or regression label Sep 23, 2024
@alifpay
Copy link
Author

alifpay commented Sep 23, 2024

log
2024/09/22 14:12:26 batch message 641500 6 0 pushbatch 4501 1283
NumDelivered: 6
Sequence.Consumer: 4501
Sequence.Stream: 1283

total message: 2000

@alifpay
Copy link
Author

alifpay commented Sep 23, 2024

check every time database if exists it will be expensive so how to get only one time

@Jarema
Copy link
Member

Jarema commented Sep 23, 2024

Hey!

Your example does not show how you actually process the acks.
Additionally - Consume will not stop after single batch is processed, but will under the hood continue polling with configures batch settings. This means that you've created few parallel processing callbacks. That is fine, if you wanted to simulate many parallel apps consuming messges from one consumer. Just pointing that out :).

Regarding the double ack:
There is a know limitation: nats-io/nats-server#4786
which means that ack send back after the ack_wait window has expired, server will accept it, while redelivery is already inflight.

@alifpay
Copy link
Author

alifpay commented Sep 23, 2024

func (s *PushService) processBatch(msg jetstream.Msg) {
	var nt models.Notification
	err := pkg.JsonUnmarshal(msg.Data(), &nt)
	if err != nil {
		log.Println("failed to unmarshal message:", err, string(msg.Data()))
		return
	}
	md, err := msg.Metadata()
	log.Println("batch message", nt.BatchId, err, md.NumDelivered, md.NumPending, md.Consumer, md.Sequence.Consumer, md.Sequence.Stream)
	vals := make([][]any, 0)
	code, resp, err := sendBatchFCM(nt)
	log.Println(nt.BatchId, code, err)
	if code == 406 {
		errStr := ""
		if err != nil {
			errStr = err.Error()
		}
		for _, r := range nt.Receivers {
			vals = append(vals, []any{nt.BatchId, r.Id, r.Token, errStr, code})
		}
		err = pkg.BulkInsert(s.ctx, s.db, "pushes", []string{"batch_id", "user_id", "token", "error", "status"}, vals)
		if err != nil {
			log.Println("failed pkg.BulkInsert:", err)
		}
		err1 := msg.DoubleAck(s.ctx)
		if err1 != nil {
			log.Println("failed to ack message:", err1, string(msg.Data()))
		}
		return
	} else if code > 0 {
		var sms = models.Notification{BatchId: nt.BatchId, Title: nt.Title, Message: nt.Message, Receivers: make([]models.Receiver, 0)}
		//todo retry for timeout and failed for 5xx
		errStr := ""
		for k, rs := range resp.Responses {
			if rs.Success {
				vals = append(vals, []any{nt.BatchId, nt.Receivers[k].Id, nt.Receivers[k].Token, rs.MessageID, 200})
			} else {
				if rs.Error != nil {
					errStr = fmt.Sprintf("%s %s", rs.Error, rs.MessageID)
				}
				sms.Receivers = append(sms.Receivers, nt.Receivers[k])
				vals = append(vals, []any{nt.BatchId, nt.Receivers[k].Id, nt.Receivers[k].Token, errStr, 417})
			}
		}
		err = pkg.BulkInsert(s.ctx, s.db, "pushes", []string{"batch_id", "user_id", "token", "error", "status"}, vals)
		if err != nil {
			log.Println("failed pkg.BulkInsert:", err)
		}
		if len(sms.Receivers) > 0 {
			bt, err := pkg.JsonMarshal(sms)
			if err != nil {
				log.Println("failed to marshal message:", err)
			} else {
				err = s.js.Publish(s.ctx, "sms.batch", bt)
				if err != nil {
					log.Println("failed to publish message:", err)
				}
			}
		}
		err1 := msg.DoubleAck(s.ctx)
		if err1 != nil {
			log.Println("failed to ack message:", err1, string(msg.Data()))
		}
	}
}

@alifpay
Copy link
Author

alifpay commented Sep 23, 2024

about ackwait is bigger than timeout of http
ackwait 50 second
http timeout 30 second

I logged response code all is is bigger 0

@alifpay
Copy link
Author

alifpay commented Sep 23, 2024

Retention: jetstream.WorkQueuePolicy,
must be delete after ACK

but it delivered more than 1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
defect Suspected defect such as a bug or regression
Projects
None yet
Development

No branches or pull requests

2 participants