Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Need a way to attach customized information to Message #1054

Closed
sdojjy opened this issue Dec 20, 2022 · 1 comment · Fixed by #1059
Closed

Need a way to attach customized information to Message #1054

sdojjy opened this issue Dec 20, 2022 · 1 comment · Fixed by #1059
Assignees

Comments

@sdojjy
Copy link

sdojjy commented Dec 20, 2022

Describe the solution you would like
We are planning to migrate from sarama to kafka-go, but currently we rely heavily on the callback mechanism of sarama. We have found that the writer struct in kafka-go has a Completion field, but we cannot attach additional information like the sarama callback mechanism to determine which message it is.

	return &kafka.Writer{
		Compression: kafka.Gzip,
		ReadTimeout:  time.Second,
		WriteTimeout: time.Second,
		Addr:         kafka.TCP(kafkaURL),
		Topic:        topic,
		Balancer:     &LeastBytes{},
		Async:        true,
		RequiredAcks: kafka.RequireOne,
		Completion: func(messages []kafka.Message, err error) {
			
		},
	}

sarama has a field named Metadata

&sarama.ProducerMessage{
		Topic:     topic,
		Partition: partition,
		Key:       sarama.StringEncoder(message.Key),
		Value:     sarama.ByteEncoder(message.Value),
		Metadata:  messageMetaData{callback: message.Callback},
	}

Is there a way to do callback in kafka-go like sarama,or is there a planned enhancement for this?

A clear and concise description of what you want to happen.

Supporting documentation

Please provides links to relevant Kafka protocol docs and/or KIPs.

@3AceShowHand
Copy link
Contributor

3AceShowHand commented Jan 9, 2023

It looks like a new field named Metadata in the Message struct can be helpful for the application to confirm the message is delivered and perform some operations.

For example:

type Message struct {
  ... 
  // This field is used to hold arbitrary data you wish to include so it
  // will be available when handle it on the Writer's `Completion method`
  Metadata interface{}
}

and we can write the Completion method like:

Completion: func(messages []kafka.Message, err error) {
	if err != nil {
                 // handle error here
		return
	}

	for _, msg := range messages {
            if msg.Metadata != nil {
                meta := msg.Metadata.(messageMetaData)
		meta.callback()
            }	
	}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants