Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feat][cpp] Add async receive function to C API #17452

Merged
merged 5 commits into from
Sep 8, 2022

Conversation

equanz
Copy link
Contributor

@equanz equanz commented Sep 4, 2022

Master Issue: #14452

Motivation

I want to rewrite the pulsar-client-node receive method to rely exclusively on async operations rather than worker threads.
However, an async receive function is not defined currently in C API.

Modifications

  • Define pulsar_consumer_receive_async and pulsar_receive_callback to use async receive operation in C API

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is already covered by existing tests, such as C++ tests.

Does this pull request potentially affect one of the following parts:

If yes was chosen, please highlight the changes

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API: (yes)
    • typedef void (*pulsar_receive_callback)(pulsar_result result, pulsar_message_t *msg, void *ctx);
    • /**
      * Asynchronously receive a single message.
      *
      * This method will initiate the operation and return immediately. The provided callback
      * will be triggered when the operation is complete.
      *
      * @param callback callback that will be triggered when the message is available
      */
      PULSAR_PUBLIC void pulsar_consumer_receive_async(pulsar_consumer_t *consumer, pulsar_receive_callback callback,
  • The schema: (no)
  • The default values of configurations: (no)
  • The wire protocol: (no)
  • The rest endpoints: (no)
  • The admin cli options: (no)
  • Anything that affects deployment: (no)

Documentation

Check the box below or label this PR directly.

Need to update docs?

  • doc-not-needed

@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Sep 4, 2022
@tisonkun
Copy link
Member

tisonkun commented Sep 4, 2022

@equanz
Copy link
Contributor Author

equanz commented Sep 4, 2022

/pulsarbot rerun-failure-checks

@BewareMyPower BewareMyPower added this to the 2.12.0 milestone Sep 4, 2022
@BewareMyPower BewareMyPower added type/feature The PR added a new feature or issue requested a new feature component/client-c++ labels Sep 4, 2022
Copy link
Contributor

@BewareMyPower BewareMyPower left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a basic e2e test for demonstrate it works? I found currently there are nearly no C API related tests. But for new APIs, we should add some tests.

@equanz
Copy link
Contributor Author

equanz commented Sep 5, 2022

@BewareMyPower

Could you add a basic e2e test for demonstrate it works?

Okay. I'll address it.

@equanz equanz changed the title [feat][c] Add async receive function to C API [feat][cpp] Add async receive function to C API Sep 5, 2022
pulsar_message_t *received_msg;
receive_promise.getFuture().get(received_msg);
ASSERT_STREQ(content, static_cast<const char *>(pulsar_message_get_data(received_msg)));
pulsar_message_free(received_msg);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should release the received_msg after the callback is done. It's better to use pure C semantics here, i.e. use C functions instead of lambda, and do not use pulsar::Promise, we should define a C struct and pass it to ctx.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll modify it.

However, I also use pulsar::Promise to wait for the callback completion in the test thread.
To achieve this process, I'll use std::future instead of conditional wait, etc.

@BewareMyPower BewareMyPower merged commit df9b057 into apache:master Sep 8, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc-not-needed Your PR changes do not impact docs type/feature The PR added a new feature or issue requested a new feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants