Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async commit does not work #132

Closed
wants to merge 3 commits into from
Closed

Conversation

thanpolas
Copy link
Contributor

I've experimented with manual committing and found that when using the async commit pattern of including the message and a callback the commit is simply never registered by kafka.

// Doesn't work
consumer.on('data', function(message) {
  consumer.commit(message, function(err) {
    console.log('not really committed');
  });
});

I've tested this against kafka 9.x, I could not setup a local dev environment for node-rdkafka as it failed on npm install with:

find: librdkafka/src-cpp: No such file or directory
gyp: Call to 'find librdkafka/src-cpp -name *.cpp' returned exit status 1 while in /Users/thanpolas/Projects/work/waldo/node-rdkafka/deps/librdkafka.gyp. while loading dependencies of binding.gyp while trying to load binding.gyp
gyp ERR! configure error

I'll see how this test case goes on CI and act accordingly, any tips to setup local dev env welcome

@thanpolas
Copy link
Contributor Author

Ok, i figured the local env stuff ( I needed to install the submodule). And thus I've managed to write a proper test case for the issue.

The unfortunate circumstance here is that consumer.committed() will happily report that the message has been committed although it actually is not.

@webmakersteve
Copy link
Contributor

Does the synchronous commit work in your test? They both use the same method under the hood, so I would imagine it doesn't.

@thanpolas
Copy link
Contributor Author

@webmakersteve no it does not, i just pushed one more commit with that test too

@webmakersteve
Copy link
Contributor

I think I remember this issue. Can you do this:

message.offset++

Then try to commit it?

@thanpolas
Copy link
Contributor Author

ha! it indeed worked with message.offset++ ... so what is up here? I think the fact that this works must be a side-effect of something else that happened here...

The message.offset is the exact offset i want to commit.

@webmakersteve
Copy link
Contributor

This was a bug I discussed a while back with @InfinitiesLoop. LibrdKafka behaves differently if given a TopicPartition object versus an RdKafka::Message object.

I am trying to replicate the functionality of the RdKafka::Message object commit without actually having an object to use, so I need to make a topic partition. But when topic partition is given, it expects the offset you commit to actually be the one you want to read from next, versus when you commit a message it assumes you are done with that offset, because you already have the message.

I need to add a separate method to distinguish between these two types of commits. Will add commitMessage and commitMessageSync

@webmakersteve
Copy link
Contributor

#133

@thanpolas
Copy link
Contributor Author

@webmakersteve just saw #133, how confident are you that adding one to the message offset will have the expected outcome? Are we taking into consideration the messages that might be in-flight?

I've fetched offsets 1, 2, 3. Message offset 1 has finished, 2 is still processing, i don't want to commit 2.

@thanpolas
Copy link
Contributor Author

I'm adding this to the conversation as well, it looks like very similar to this case IBM/sarama#705

@InfinitiesLoop
Copy link

It does seem counterintuitive, but it's really how it works. You commit the offset that is the next message you want to read, not the offset of the last message you processed. Librdkafka abstracts that fact, adding 1 to the committed offset when you commit a message. But it also has a method to commit an offset directly, which does not add 1. We are using that one, causing ambiguity with which the client intended. Adding a commitMessage or the like will address that. We should be sure and heavily document the difference.

@webmakersteve
Copy link
Contributor

Fixed with #133. Will update documentation in upcoming commit.

@webmakersteve
Copy link
Contributor

I would definitely not be against having the additional tests in the test suite. If you're okay rebasing your changes and updating this commit, you can submit it as a separate PR, or re-open this one and I'll merge it in.

@thanpolas
Copy link
Contributor Author

PR Continues on #137

@Jose123456
Copy link
Contributor

commit(message) still exhibits the old behavior i.e., it commits the previous message. Is this expected?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants