Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Error: Failed to send message: AlreadyClosed #78

Closed
alijmlzd opened this issue Mar 1, 2020 · 25 comments · Fixed by #85
Closed

Error: Failed to send message: AlreadyClosed #78

alijmlzd opened this issue Mar 1, 2020 · 25 comments · Fixed by #85

Comments

@alijmlzd
Copy link

alijmlzd commented Mar 1, 2020

Hi there,
Why this example with 1 second timeout works fine:

const client = new Pulsar.Client({
    serviceUrl: 'pulsar://localhost:6650',
})

const producer = await client.createProducer({
    topic: 'persistent://public/default/my-topic',
})

setTimeout(() => {
    producer.send({
        data: Buffer.from('My Message'),
    })
}, 1000)

But this example with a few more seconds timeout occurs an error?

const client = new Pulsar.Client({
    serviceUrl: 'pulsar://localhost:6650',
})

const producer = await client.createProducer({
    topic: 'persistent://public/default/my-topic',
})

setTimeout(() => {
    producer.send({
        data: Buffer.from('My Message'),
    })
}, 9000)

Full error message:

You have triggered an unhandledRejection, you may have forgotten to catch a Promise rejection:
Error: Failed to send message: AlreadyClosed
@alijmlzd
Copy link
Author

alijmlzd commented Mar 4, 2020

Is there any kind of activity here to solve this issue?
To be more specific, I run pulsar server inside Docker like this as described here:

$ docker run -it \
  -p 6650:6650 \
  -p 8080:8080 \
  -v $PWD/data:/pulsar/data \
  apachepulsar/pulsar:2.0.0-rc1-incubating \
  bin/pulsar standalone

And here is the pulsar server logs:

18:33:03.492 [pulsar-io-50-4] INFO  org.apache.pulsar.broker.service.ServerCnx - New connection from /172.17.0.1:55506
18:33:03.607 [pulsar-io-50-4] INFO  org.apache.pulsar.broker.service.ServerCnx - [/172.17.0.1:55506][persistent://public/default/my-topic] Creating producer. producerId=0
18:33:03.722 [Thread-5] INFO  org.apache.pulsar.broker.service.ServerCnx - [/172.17.0.1:55506] persistent://public/default/my-topic configured with schema false
18:33:03.728 [Thread-5] INFO  org.apache.pulsar.broker.service.ServerCnx - [/172.17.0.1:55506] Created new producer: Producer{topic=PersistentTopic{topic=persistent://public/default/my-topic}, client=/172.17.0.1:55506, producerName=standalone-8-2, producerId=0}
18:33:19.431 [pulsar-io-50-4] INFO  org.apache.pulsar.broker.service.ServerCnx - Closed connection from /172.17.0.1:55506
18:33:20.480 [pulsar-web-57-1] INFO  org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [04/Mar/2020:18:33:20 +0000] "GET /admin/v2/persistent/public/functions/coordinate/stats HTTP/1.1" 200 882 "-" "Pulsar-Java-v2.5.0" 9

I see on log 18:33:19.431 there is a connection close after a few second from producer creation. so this is not clear enough for me that this is the client behavior or pulsar server.

@alexyans
Copy link

alexyans commented Mar 5, 2020

I have the same problem. Sending a message immediately after initializing the producer works fine, so it can't be a TLS handshake issue. The connection seems to timeout after a second or so.

@alexyans
Copy link

alexyans commented Mar 5, 2020

@alijmlzd did you try rolling back to an older version? Any luck finding one?

@alijmlzd
Copy link
Author

alijmlzd commented Mar 5, 2020

@alexyans No, but I will try that and share my result.
By the way, what version you recommend to start?

@alexyans
Copy link

alexyans commented Mar 5, 2020

I tried 2.4 of the client as it was the one recommended by Kafkaesque, but the problem was still there for me. Didn't bother going further back as this is unspecified behavior and most likely caused by the pulsar server, and I have no control of what version Kafkaesque uses. It appears that the server explicitly closes the connection for some reason.

For my use case, I worked around the problem by always creating the producer right before i emit a message. Flushing and closing after use should prevent leaks.

@alijmlzd
Copy link
Author

alijmlzd commented Mar 5, 2020

For my use case, I worked around the problem by always creating the producer right before I emit a message. Flushing and closing after use should prevent leaks.

Oh, as a quick fix its a good idea. but what if I have about more than 1000 send messages per second? Is it ok to create a producer, send and then flush and close the producer each time?

@alexyans
Copy link

alexyans commented Mar 6, 2020

That would be hard to tell without looking at the code. We know the client uses a connection pool for the original client connection, but it's not clear to me whether the producers/consumers create separate connections or piggyback on the one client connection. I'm not exactly sure what happens when you create a producer, it could be cheaper than it looks.

But yeah, I share your frustration. Hope it gets resolved soon.

@equanz
Copy link
Contributor

equanz commented Mar 12, 2020

It seems that setTimeout callback function run after pulsar client was destructed.
I think if use setTimeout in this case, should to use Promise and wait for timeout.

Here is an example.

const Pulsar = require("pulsar-client");

async function produceTest(timeout) {
  const client = new Pulsar.Client({
    serviceUrl: "pulsar://localhost:6650"
  });

  const producer = await client.createProducer({
    topic: "persistent://public/default/my-topic"
  });

  await new Promise((resolve, reject) => {
    setTimeout(() => {
      producer.send({
        data: Buffer.from("My Message")
      });
      resolve();
    }, timeout);
  });

  await client.close();
}

produceTest(1000);
produceTest(9000);

@alexyans
Copy link

@equanz your example handles the rejection, sure. But there shouldn't be a rejection in the first place. A producer closing after some timeout is unspecified behavior. It also drifts from the convention: a producer connection should remain open as long as you need it open. Otherwise there would be no need to explicitly close it as per the docs. If it closes itself no matter what you do, it is safe from leaks anyway.

@equanz
Copy link
Contributor

equanz commented Mar 13, 2020

As above, it seems that client has already been destructed when executing send after timeout ms. Therefore, it seems that the connection was closed by not Broker but client.

Here is the destructor implementation in C++ client(Node.js client wraps C++ client)

I think you need to avoid client destruction before producing.
(Therefore, in my example, await is put on setTimeout)

@sijie
Copy link
Member

sijie commented Mar 18, 2020

"AlreadyClosed" indicates that the client is already closed. You need to avoid client destruction.

See a producer example: http://pulsar.apache.org/docs/en/client-libraries-node/#producer-example

@yosiat
Copy link
Contributor

yosiat commented Mar 19, 2020

How I avoid client destruction?

In the following example, I create a producer and produce a message every 1 second.

const Pulsar = require('pulsar-client');

(async () => {
  // Create a client
  const client = new Pulsar.Client({
    serviceUrl: 'pulsar://localhost:6650'
  });

  // Create a producer
  const producer = await client.createProducer({
    topic: 'persistent://public/default/my-topic',
    sendTimeoutMs: 30000,
    batchingEnabled: true,
  });

  let i =0;
  async function produceMessage() {
    i = i + 1;

    const msg = `my-message-${i}`;
    const ret = await producer.send({
      data: Buffer.from(msg),
    });
    console.log(`Sent message: ${msg}`);
    console.log(ret);

    setTimeout(produceMessage, 1000);
  }

  produceMessage();
})();

Output:

2020-03-19 14:33:49.010 INFO  ConnectionPool:85 | Created connection for pulsar://localhost:6650
2020-03-19 14:33:49.011 INFO  ClientConnection:330 | [127.0.0.1:64239 -> 127.0.0.1:6650] Connected to broker
2020-03-19 14:33:49.111 INFO  BatchMessageContainer:43 | { BatchContainer [size = 0] [batchSizeInBytes_ = 0] [maxAllowedMessageBatchSizeInBytes_ = 131072] [maxAllowedNumMessagesInBatch_ = 1000] [topicName = persistent://public/default/my-topic] [producerName_ = ] [batchSizeInBytes_ = 0] [numberOfBatchesSent = 0] [averageBatchSize = 0]} BatchMessageContainer constructed
2020-03-19 14:33:49.111 INFO  HandlerBase:53 | [persistent://public/default/my-topic, ] Getting connection from pool
2020-03-19 14:33:49.155 INFO  ProducerImpl:151 | [persistent://public/default/my-topic, ] Created producer on broker [127.0.0.1:64239 -> 127.0.0.1:6650]
Sent message: my-message-1
null
Sent message: my-message-2
null
Sent message: my-message-3
null
Sent message: my-message-4
null
Sent message: my-message-5
null
Sent message: my-message-6
null
Sent message: my-message-7
null
Sent message: my-message-8
null
2020-03-19 14:33:57.128 INFO  ClientConnection:1349 | [127.0.0.1:64239 -> 127.0.0.1:6650] Connection closed
(node:37259) UnhandledPromiseRejectionWarning: Error: Failed to send message: AlreadyClosed
(node:37259) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). (rejection id: 1)
(node:37259) [DEP0018] DeprecationWarning: Unhandled promise rejections are deprecated. In the future, promise rejections that are not handled will terminate the Node.js process with a non-zero exit code.
2020-03-19 14:33:57.312 INFO  ProducerImpl:472 | Producer - [persistent://public/default/my-topic, standalone-1-71] , [batchMessageContainer = { BatchContainer [size = 0] [batchSizeInBytes_ = 0] [maxAllowedMessageBatchSizeInBytes_ = 131072] [maxAllowedNumMessagesInBatch_ = 1000] [topicName = persistent://public/default/my-topic] [producerName_ = standalone-1-71] [batchSizeInBytes_ = 0] [numberOfBatchesSent = 8] [averageBatchSize = 1]}]
2020-03-19 14:33:57.312 INFO  BatchMessageContainer:170 | [numberOfBatchesSent = 8] [averageBatchSize = 1]
2020-03-19 14:33:57.312 INFO  ClientConnection:235 | [127.0.0.1:64239 -> 127.0.0.1:6650] Destroyed connection

@alexyans
Copy link

This is a playground example though. In a real system you can't necessarily control the rate of messages produced, as they correspond to real-time events. You could send mock keep-alive messages but it's ugly and a waste of resoures.

@yosiat
Copy link
Contributor

yosiat commented Mar 19, 2020

@alexyans yeah this is an example, but in real system, I want to use the same producer and publish messages every now and then. we have a system that publish a message every one minute, in this case I will need to create new producer every-time.

I think there is no ref maintaned to producer and this why nodejs decided to close the producer. I think we need to fix this.

@eaba
Copy link

eaba commented Mar 21, 2020

There are two sides to this, cause I experienced it!
1 - If your producer is actually being closed, have you tried Ping-ing the server every, lets say, 30s. Also try handling Ping command coming from the server.

2 - It might be that you created a producer with one connection and sending messages via another connection that is not meant for that producer - you will get alreadyclosed exception even if the producer is alive!

@yosiat
Copy link
Contributor

yosiat commented Mar 23, 2020

Hey!

I see you what you are writing here, but this is a bug in the client.
I don't need to implement any keepalives or ping-pong mechanism on my own.

Here is the go client, doing the same thing I did with my nodejs example (#78 (comment)) - https://gist.github.com/yosiat/97460fb4dee41c5d0d0f6873a31c77d9

The go code is producing all messages, while the node client is dying after some time.

the problem in my opinion that my function is maintaining a reference to the producer but no to the client which cause NodeJS/V8 to garage collect the client since it's not being referenced.
Changing my code to, instead of console.log(Sent message: ${msg}); to console.log(Sent message: ${msg} - client ${client}); fixes the issue for me.
IMO, But this is wrong.

@eaba my example has only one client and one producer.

@alexyans
Copy link

Changing my code to, instead of console.log(Sent message: ${msg}); to console.log(Sent message: ${msg} - client ${client}); fixes the issue for me.

okay if that's true, i think you're right about the root cause. @sijie how do you want to proceed?

@sijie
Copy link
Member

sijie commented Mar 24, 2020

@alexyans @yosiat I think we can let Producer object keep a reference to the client. So if your application keeps references to Producer, it should keep the reference to the client as well. So the client instance will not be destroyed.

@yosiat
Copy link
Contributor

yosiat commented Mar 24, 2020

@sijie sounds like a good approach, I tried my approach of creating napi ref, but couldn't find a way to implement it.

I will implement your approach and create a pull request for that.

@yosiat
Copy link
Contributor

yosiat commented Mar 25, 2020

@sijie tried out creating a reference to a client from the producer but it didn't help (tried with InstanceAccesor) and it created a weird dependency on the code which I didn't feel comfortable doing.

But I found out how to create a reference for a client and un-reference it when the client closes the connection.

This PR - #85 fixed the code I wrote above :)

@alexyans
Copy link

@yosiat thank you!!

@yosiat
Copy link
Contributor

yosiat commented Mar 26, 2020

@alexyans happy to help :)

@jbmusso
Copy link

jbmusso commented Sep 3, 2022

I'm new to Pulsar and I'm having the exact same issue, when running the official examples using:

  • pulsar-client: 1.6.2
  • libpulsar/2.10.1_1
  • Node 18.8.0
  • Apache Pulsar 2.10.1

Official producer.js example:

const Pulsar = require('pulsar-client');

(async () => {
  // Create a client
  const client = new Pulsar.Client({
    serviceUrl: 'pulsar://localhost:6650',
  });

  // Create a producer
  const producer = await client.createProducer({
    topic: 'my-topic',
  });

  // Send messages
  for (let i = 0; i < 10; i += 1) {
    const msg = `my-message-${i}`;
    producer.send({
      data: Buffer.from(msg),
    });
    console.log(`Sent message: ${msg}`);
  }
  await producer.flush();

  await producer.close();
  await client.close();
})();

Output of running node producer.js:

2022-09-03 21:29:38.915 INFO  [0x16f867000] ExecutorService:41 | Run io_service in a single thread
2022-09-03 21:29:38.916 INFO  [0x16dfc3000] ClientConnection:189 | [<none> -> pulsar://localhost:6650] Create ClientConnection, timeout=10000
2022-09-03 21:29:38.916 INFO  [0x16dfc3000] ConnectionPool:96 | Created connection for pulsar://localhost:6650
2022-09-03 21:29:38.918 WARN  [0x16f867000] ClientConnection:436 | [<none> -> pulsar://localhost:6650] Failed to establish connection: Connection refused
2022-09-03 21:29:38.919 INFO  [0x16f867000] ClientConnection:375 | [127.0.0.1:57853 -> 127.0.0.1:6650] Connected to broker
2022-09-03 21:29:38.956 INFO  [0x16f867000] HandlerBase:64 | [persistent://public/default/my-topic, ] Getting connection from pool
2022-09-03 21:29:39.013 INFO  [0x16f867000] ProducerImpl:189 | [persistent://public/default/my-topic, ] Created producer on broker [127.0.0.1:57853 -> 127.0.0.1:6650] 
Sent message: my-message-0
Sent message: my-message-1
Sent message: my-message-2
Sent message: my-message-3
Sent message: my-message-4
Sent message: my-message-5
Sent message: my-message-6
Sent message: my-message-7
Sent message: my-message-8
Sent message: my-message-9
2022-09-03 21:29:39.037 INFO  [0x16dfc3000] ProducerImpl:661 | [persistent://public/default/my-topic, standalone-0-54] Closing producer for topic persistent://public/default/my-topic
node:internal/process/promises:288
            triggerUncaughtException(err, true /* fromPromise */);
            ^

[Error: Failed to send message: AlreadyClosed]

Node.js v18.8.0
2022-09-03 21:29:39.038 INFO  [0x16f867000] ProducerImpl:704 | [persistent://public/default/my-topic, standalone-0-54] Closed producer

Environment info:

Output of brew info libpulsar

==> libpulsar: stable 2.10.1 (bottled)
Apache Pulsar C++ library
https://pulsar.apache.org/
/opt/homebrew/Cellar/libpulsar/2.10.1_1 (57 files, 14.4MB) *
  Poured from bottle on 2022-09-03 at 16:24:09
From: https://github.com/Homebrew/homebrew-core/blob/HEAD/Formula/libpulsar.rb
License: Apache-2.0
==> Dependencies
Build: cmake ✔, pkg-config ✔
Required: boost ✔, [email protected] ✔, protobuf ✔, snappy ✔, zstd ✔
==> Analytics
install: 250 (30 days), 898 (90 days), 3,403 (365 days)
install-on-request: 251 (30 days), 899 (90 days), 3,404 (365 days)
build-error: 1 (30 days)

I installed the C++ client with the installation instructions here: https://pulsar.apache.org/docs/client-libraries-cpp/#compilation

Let me know if I should open another issue. I don't know C++ but it appears that the patch added in #85 isn't in current master: https://github.com/apache/pulsar-client-node/blob/8893e47e87118f89ad324ccf5e60785eabc5b34d/src/Client.cc -- unsure if this is related or not.

edit - 11f28a6#diff-4c2ad2d5dd2477f5e7deebe4d839fbde8c00e36b38321cdb1d92c39c07fb070dL154 possibly removed the fixed introduced by #85, though the fix appears to still be in [email protected] (currently in my node_modules).

Thanks.

@k2la
Copy link
Contributor

k2la commented Sep 15, 2022

@jbmusso
I have same issue in my environment using your settings and sample code.
But no errors occur with master branch of pulsar-client-node.
Could you try with master branch?

@equanz
Copy link
Contributor

equanz commented Oct 14, 2022

@jbmusso
If batchingEnabled is false(default), Producer#flush does nothing.
From C++ v2.8.2, pending messages fail when Producer#close is called.
apache/pulsar#11570
https://github.com/apache/pulsar/blob/v2.8.2/pulsar-client-cpp/lib/ProducerImpl.cc#L609-L610

To fix it, for example, simply await Producer#send.

For more detail, please see: apache/pulsar-client-cpp#51

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

8 participants