Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PulsarProxy does not always refresh authentication tokens received from client #10816

Closed
fmiguelez opened this issue Jun 3, 2021 · 20 comments · Fixed by #17831 or #20067
Closed

PulsarProxy does not always refresh authentication tokens received from client #10816

fmiguelez opened this issue Jun 3, 2021 · 20 comments · Fixed by #17831 or #20067
Assignees
Labels
type/bug The PR fixed a bug or issue reported a bug

Comments

@fmiguelez
Copy link
Contributor

Describe the bug
We have Pulsar deployed as a single cluster with a Pulsar Proxy, 3 brokers, 3 bookies and 3 zookeepers.

We use token authentication. Tokens expire every 5 minutes and they are refreshed 1 minute before they expire. We use Supplier interface in PulsarClient to provide a valid token.

The thing is that if we connect directly to brokers token refreshing works perfectly (we use a Docker Swarm and deploy broker as a service with multiple instances, so swarm manages balancing of connections). No ClientCnx erros are displayed (see #8922) and new Reader, Consumer or Producer can be created from same client instance.

Te problem comes when we use Pulsar Proxy. It seems that Pulsar Proxy keeps a connection pool against the broker while it manages connections from Pulsar Client. Connection pool against brokers eventually requires creating new connections, same as connections between Client and Proxy. Connection between Client and Proxy always has valid tokens, since when a new connection in pool is required Supplier in PulsarClient returns a currently valid token.

The connection between the proxy and the brokers seems to be another story as when at least 5 minutes have elapsed since first token was used for first Producer, Reader or Consumer new connections may fail between Proxy and brokers. But not in all cases.

Below are some logs from a custom test that consumes data from a topic using a Reader and produces data every 30 seconds (using a new Producer each time). The token is refreshed 2 times but after second one creating new Producer fails.

Look at RefreshableCredentialsSupplier logs that inform when a token is refreshed and after it is destroyed when was it last accessed by PulsarClient.

2021-06-03 12:36:50.014  INFO 12076 --- [subscriptionMessageSender-1] k.o.d.a.s.RefreshableCredentialsSupplier : New credentials: {clientId: etx, tokenHash: 2746ae06dde51c721854154092a1dab1, exp: 2021-06-03T10:40:57Z, life: PT4M6.986S}
2021-06-03 12:36:50.819  INFO 12076 --- [pulsar-client-io-8-1] o.a.pulsar.client.impl.ConnectionPool    : [[id: 0xd7e45848, L:/172.19.5.106:62072 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650]] Connected to server
2021-06-03 12:36:51.140  INFO 12076 --- [pulsar-client-io-8-1] o.a.p.c.impl.ConsumerStatsRecorderImpl   : Starting Pulsar consumer status recorder with config: {
2021-06-03 12:36:51.145  INFO 12076 --- [pulsar-client-io-8-1] o.a.p.c.impl.ConsumerStatsRecorderImpl   : Pulsar client config: {
  "serviceUrl" : "pulsar://bio-qa-demo.ktc-onelab.lcl:6650",
2021-06-03 12:36:51.400  INFO 12076 --- [pulsar-client-io-8-1] o.a.pulsar.client.impl.ConnectionPool    : [[id: 0x6c2f57c6, L:/172.19.5.106:62073 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650]] Connected to server
2021-06-03 12:36:51.401  INFO 12076 --- [pulsar-client-io-8-1] org.apache.pulsar.client.impl.ClientCnx  : [id: 0x6c2f57c6, L:/172.19.5.106:62073 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650] Connected through proxy to target broker at broker2:6650
2021-06-03 12:36:51.531  INFO 12076 --- [pulsar-client-io-8-1] o.a.pulsar.client.impl.ConsumerImpl      : [persistent://dbus/test/dummy-objects][reader-8d798953a5] Subscribing to topic on cnx [id: 0x6c2f57c6, L:/172.19.5.106:62073 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650], consumerId 0
2021-06-03 12:36:51.675  INFO 12076 --- [pulsar-client-io-8-1] o.a.pulsar.client.impl.ConsumerImpl      : [persistent://dbus/test/dummy-objects][reader-8d798953a5] Subscribed to topic on bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650 -- consumer: 0
2021-06-03 12:36:54.541  INFO 12076 --- [pulsar-client-io-8-1] o.a.pulsar.client.impl.ConsumerImpl      : [persistent://dbus/test/dummy-objects] [reader-8d798953a5] Closed consumer
2021-06-03 12:37:57.990  INFO 12076 --- [XNIO-1 task-9] o.a.pulsar.client.impl.PulsarClientImpl  : Client closing. URL: pulsar://bio-qa-demo.ktc-onelab.lcl:6650
2021-06-03 12:37:57.998  INFO 12076 --- [pulsar-client-io-8-1] org.apache.pulsar.client.impl.ClientCnx  : [id: 0x6c2f57c6, L:/172.19.5.106:62073 ! R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650] Disconnected
2021-06-03 12:37:58.002  INFO 12076 --- [pulsar-client-io-8-1] org.apache.pulsar.client.impl.ClientCnx  : [id: 0xd7e45848, L:/172.19.5.106:62072 ! R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650] Disconnected
2021-06-03 12:38:08.872  INFO 12076 --- [subscriptionMessageSender-2] k.o.d.a.s.RefreshableCredentialsSupplier : New credentials: {clientId: etx, tokenHash: 9a099eaee9325f97f332f8b83200bae5, exp: 2021-06-03T10:42:55Z, life: PT4M46.128S}
2021-06-03 12:38:09.057  INFO 12076 --- [pulsar-client-io-12-1] o.a.pulsar.client.impl.ConnectionPool    : [[id: 0xf2df9fcd, L:/172.19.5.106:62137 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650]] Connected to server
2021-06-03 12:38:09.299  INFO 12076 --- [pulsar-client-io-12-1] o.a.p.c.impl.ConsumerStatsRecorderImpl   : Starting Pulsar consumer status recorder with config: {
2021-06-03 12:38:09.303  INFO 12076 --- [pulsar-client-io-12-1] o.a.p.c.impl.ConsumerStatsRecorderImpl   : Pulsar client config: {
  "serviceUrl" : "pulsar://bio-qa-demo.ktc-onelab.lcl:6650",
2021-06-03 12:38:09.543  INFO 12076 --- [pulsar-client-io-12-1] o.a.pulsar.client.impl.ConnectionPool    : [[id: 0xb5aa0fd1, L:/172.19.5.106:62138 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650]] Connected to server
2021-06-03 12:38:09.543  INFO 12076 --- [pulsar-client-io-12-1] org.apache.pulsar.client.impl.ClientCnx  : [id: 0xb5aa0fd1, L:/172.19.5.106:62138 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650] Connected through proxy to target broker at broker1:6650
2021-06-03 12:38:09.675  INFO 12076 --- [pulsar-client-io-12-1] o.a.pulsar.client.impl.ConsumerImpl      : [persistent://dbus/test/dummy-objects][reader-bda41b7606] Subscribing to topic on cnx [id: 0xb5aa0fd1, L:/172.19.5.106:62138 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650], consumerId 0
2021-06-03 12:38:09.796  INFO 12076 --- [pulsar-client-io-12-1] o.a.pulsar.client.impl.ConsumerImpl      : [persistent://dbus/test/dummy-objects][reader-bda41b7606] Subscribed to topic on bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650 -- consumer: 0
2021-06-03 12:38:14.242  INFO 12076 --- [pulsar-client-io-12-1] o.a.p.c.impl.ProducerStatsRecorderImpl   : Starting Pulsar producer perf with config: {
2021-06-03 12:38:14.244  INFO 12076 --- [pulsar-client-io-12-1] o.a.p.c.impl.ProducerStatsRecorderImpl   : Pulsar client config: {
  "serviceUrl" : "pulsar://bio-qa-demo.ktc-onelab.lcl:6650",
2021-06-03 12:38:14.360  INFO 12076 --- [pulsar-client-io-12-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [null] Creating producer on cnx [id: 0xb5aa0fd1, L:/172.19.5.106:62138 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650]
2021-06-03 12:38:14.489  INFO 12076 --- [pulsar-client-io-12-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [databus-75-1417] Created producer on cnx [id: 0xb5aa0fd1, L:/172.19.5.106:62138 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650]
2021-06-03 12:38:14.550  WARN 12076 --- [pulsar-client-io-12-1] c.s.circe.checksum.Crc32cIntChecksum     : Failed to load Circe JNI library. Falling back to Java based CRC32c provider
2021-06-03 12:38:14.783  INFO 12076 --- [pulsar-client-io-12-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [databus-75-1417] Closed Producer
2021-06-03 12:38:46.093  INFO 12076 --- [pulsar-client-io-12-1] o.a.p.c.impl.ProducerStatsRecorderImpl   : Starting Pulsar producer perf with config: {
2021-06-03 12:38:46.096  INFO 12076 --- [pulsar-client-io-12-1] o.a.p.c.impl.ProducerStatsRecorderImpl   : Pulsar client config: {
  "serviceUrl" : "pulsar://bio-qa-demo.ktc-onelab.lcl:6650",
2021-06-03 12:38:46.209  INFO 12076 --- [pulsar-client-io-12-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [null] Creating producer on cnx [id: 0xb5aa0fd1, L:/172.19.5.106:62138 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650]
2021-06-03 12:38:46.325  INFO 12076 --- [pulsar-client-io-12-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [databus-75-1421] Created producer on cnx [id: 0xb5aa0fd1, L:/172.19.5.106:62138 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650]
2021-06-03 12:38:46.556  INFO 12076 --- [pulsar-client-io-12-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [databus-75-1421] Closed Producer
2021-06-03 12:39:09.307  INFO 12076 --- [pulsar-timer-15-1] o.a.p.c.impl.ConsumerStatsRecorderImpl   : [persistent://dbus/test/dummy-objects] [reader-bda41b7606] [11266] Prefetched messages: 0 --- Consume throughput received: 0,03 msgs/s --- 0,00 Mbit/s --- Ack sent rate: 0,00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
2021-06-03 12:39:17.831  INFO 12076 --- [pulsar-client-io-12-1] o.a.p.c.impl.ProducerStatsRecorderImpl   : Starting Pulsar producer perf with config: {
2021-06-03 12:39:17.835  INFO 12076 --- [pulsar-client-io-12-1] o.a.p.c.impl.ProducerStatsRecorderImpl   : Pulsar client config: {
  "serviceUrl" : "pulsar://bio-qa-demo.ktc-onelab.lcl:6650",
2021-06-03 12:39:17.948  INFO 12076 --- [pulsar-client-io-12-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [null] Creating producer on cnx [id: 0xb5aa0fd1, L:/172.19.5.106:62138 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650]
2021-06-03 12:39:18.064  INFO 12076 --- [pulsar-client-io-12-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [databus-75-1422] Created producer on cnx [id: 0xb5aa0fd1, L:/172.19.5.106:62138 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650]
2021-06-03 12:39:18.296  INFO 12076 --- [pulsar-client-io-12-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [databus-75-1422] Closed Producer
2021-06-03 12:39:49.608  INFO 12076 --- [pulsar-client-io-12-1] o.a.p.c.impl.ProducerStatsRecorderImpl   : Starting Pulsar producer perf with config: {
2021-06-03 12:39:49.610  INFO 12076 --- [pulsar-client-io-12-1] o.a.p.c.impl.ProducerStatsRecorderImpl   : Pulsar client config: {
  "serviceUrl" : "pulsar://bio-qa-demo.ktc-onelab.lcl:6650",
2021-06-03 12:39:49.723  INFO 12076 --- [pulsar-client-io-12-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [null] Creating producer on cnx [id: 0xb5aa0fd1, L:/172.19.5.106:62138 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650]
2021-06-03 12:39:49.843  INFO 12076 --- [pulsar-client-io-12-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [databus-75-1423] Created producer on cnx [id: 0xb5aa0fd1, L:/172.19.5.106:62138 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650]
2021-06-03 12:39:50.069  INFO 12076 --- [pulsar-client-io-12-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [databus-75-1423] Closed Producer
2021-06-03 12:40:09.309  INFO 12076 --- [pulsar-timer-15-1] o.a.p.c.impl.ConsumerStatsRecorderImpl   : [persistent://dbus/test/dummy-objects] [reader-bda41b7606] [11266] Prefetched messages: 0 --- Consume throughput received: 0,03 msgs/s --- 0,00 Mbit/s --- Ack sent rate: 0,00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
2021-06-03 12:40:21.639  INFO 12076 --- [pulsar-client-io-12-1] o.a.p.c.impl.ProducerStatsRecorderImpl   : Starting Pulsar producer perf with config: {
2021-06-03 12:40:21.642  INFO 12076 --- [pulsar-client-io-12-1] o.a.p.c.impl.ProducerStatsRecorderImpl   : Pulsar client config: {
  "serviceUrl" : "pulsar://bio-qa-demo.ktc-onelab.lcl:6650",
2021-06-03 12:40:21.754  INFO 12076 --- [pulsar-client-io-12-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [null] Creating producer on cnx [id: 0xb5aa0fd1, L:/172.19.5.106:62138 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650]
2021-06-03 12:40:21.870  INFO 12076 --- [pulsar-client-io-12-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [databus-75-1424] Created producer on cnx [id: 0xb5aa0fd1, L:/172.19.5.106:62138 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650]
2021-06-03 12:40:22.109  INFO 12076 --- [pulsar-client-io-12-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [databus-75-1424] Closed Producer
2021-06-03 12:40:53.392  INFO 12076 --- [pulsar-client-io-12-1] o.a.p.c.impl.ProducerStatsRecorderImpl   : Starting Pulsar producer perf with config: {
2021-06-03 12:40:53.395  INFO 12076 --- [pulsar-client-io-12-1] o.a.p.c.impl.ProducerStatsRecorderImpl   : Pulsar client config: {
  "serviceUrl" : "pulsar://bio-qa-demo.ktc-onelab.lcl:6650",
2021-06-03 12:40:53.507  INFO 12076 --- [pulsar-client-io-12-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [null] Creating producer on cnx [id: 0xb5aa0fd1, L:/172.19.5.106:62138 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650]
2021-06-03 12:40:53.625  INFO 12076 --- [pulsar-client-io-12-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [databus-75-1425] Created producer on cnx [id: 0xb5aa0fd1, L:/172.19.5.106:62138 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650]
2021-06-03 12:40:53.855  INFO 12076 --- [pulsar-client-io-12-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [databus-75-1425] Closed Producer
2021-06-03 12:41:09.311  INFO 12076 --- [pulsar-timer-15-1] o.a.p.c.impl.ConsumerStatsRecorderImpl   : [persistent://dbus/test/dummy-objects] [reader-bda41b7606] [11266] Prefetched messages: 0 --- Consume throughput received: 0,03 msgs/s --- 0,00 Mbit/s --- Ack sent rate: 0,00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
2021-06-03 12:41:25.219  INFO 12076 --- [pulsar-client-io-12-1] o.a.p.c.impl.ProducerStatsRecorderImpl   : Starting Pulsar producer perf with config: {
2021-06-03 12:41:25.223  INFO 12076 --- [pulsar-client-io-12-1] o.a.p.c.impl.ProducerStatsRecorderImpl   : Pulsar client config: {
  "serviceUrl" : "pulsar://bio-qa-demo.ktc-onelab.lcl:6650",
2021-06-03 12:41:25.346  INFO 12076 --- [pulsar-client-io-12-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [null] Creating producer on cnx [id: 0xb5aa0fd1, L:/172.19.5.106:62138 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650]
2021-06-03 12:41:25.466  INFO 12076 --- [pulsar-client-io-12-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [databus-75-1429] Created producer on cnx [id: 0xb5aa0fd1, L:/172.19.5.106:62138 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650]
2021-06-03 12:41:25.699  INFO 12076 --- [pulsar-client-io-12-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [databus-75-1429] Closed Producer
2021-06-03 12:41:56.976  INFO 12076 --- [XNIO-1 task-19] k.o.d.a.s.RefreshableCredentialsSupplier : New credentials: {clientId: etx, tokenHash: 5b90493918766fa0b23fcc63aa93455b, exp: 2021-06-03T10:46:56Z, life: PT4M59.024S}
2021-06-03 12:41:57.161  INFO 12076 --- [pulsar-client-io-16-1] o.a.pulsar.client.impl.ConnectionPool    : [[id: 0x570ca8a3, L:/172.19.5.106:59319 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650]] Connected to server
2021-06-03 12:41:57.547  INFO 12076 --- [pulsar-client-io-16-1] o.a.p.c.impl.ProducerStatsRecorderImpl   : Starting Pulsar producer perf with config: {
2021-06-03 12:41:57.550  INFO 12076 --- [pulsar-client-io-16-1] o.a.p.c.impl.ProducerStatsRecorderImpl   : Pulsar client config: {
  "serviceUrl" : "pulsar://bio-qa-demo.ktc-onelab.lcl:6650",
2021-06-03 12:41:57.785  INFO 12076 --- [pulsar-client-io-16-1] o.a.pulsar.client.impl.ConnectionPool    : [[id: 0xb64f1099, L:/172.19.5.106:59320 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650]] Connected to server
2021-06-03 12:41:57.786  INFO 12076 --- [pulsar-client-io-16-1] org.apache.pulsar.client.impl.ClientCnx  : [id: 0xb64f1099, L:/172.19.5.106:59320 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650] Connected through proxy to target broker at broker1:6650
2021-06-03 12:41:57.917  INFO 12076 --- [pulsar-client-io-16-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [null] Creating producer on cnx [id: 0xb64f1099, L:/172.19.5.106:59320 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650]
2021-06-03 12:41:58.036  INFO 12076 --- [pulsar-client-io-16-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [databus-75-1431] Created producer on cnx [id: 0xb64f1099, L:/172.19.5.106:59320 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650]
2021-06-03 12:41:58.267  INFO 12076 --- [pulsar-client-io-16-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [databus-75-1431] Closed Producer
2021-06-03 12:41:58.699  INFO 12076 --- [clientInboundChannel-33] k.o.d.a.s.RefreshableCredentialsSupplier : Credentials refreshed: {old: {clientId: etx, tokenHash: 9a099eaee9325f97f332f8b83200bae5, exp: 2021-06-03T10:42:55Z, life: PT56.301S}, new: {clientId: etx, tokenHash: 5b90493918766fa0b23fcc63aa93455b, exp: **2021-06-03T10:46:56Z**, life: PT4M57.301S}}
2021-06-03 12:41:58.700  INFO 12076 --- [clientInboundChannel-33] k.o.d.a.s.RefreshableCredentialsSupplier : Supplier deleted: {lastSupply: {moment: 2021-06-03T10:41:57.786Z, creds: {clientId: etx, tokenHash: 5b90493918766fa0b23fcc63aa93455b, exp: **2021-06-03T10:46:56Z**, life: PT4M57.3S}}, curCreds: {clientId: etx, tokenHash: 5b90493918766fa0b23fcc63aa93455b, exp: **2021-06-03T10:46:56Z**, life: PT4M57.3S}, prevCreds: <NONE>}
2021-06-03 12:42:09.313  INFO 12076 --- [pulsar-timer-15-1] o.a.p.c.impl.ConsumerStatsRecorderImpl   : [persistent://dbus/test/dummy-objects] [reader-bda41b7606] [11266] Prefetched messages: 0 --- Consume throughput received: 0,03 msgs/s --- 0,00 Mbit/s --- Ack sent rate: 0,00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
2021-06-03 12:42:29.547  INFO 12076 --- [pulsar-client-io-16-1] o.a.p.c.impl.ProducerStatsRecorderImpl   : Starting Pulsar producer perf with config: {
2021-06-03 12:42:29.549  INFO 12076 --- [pulsar-client-io-16-1] o.a.p.c.impl.ProducerStatsRecorderImpl   : Pulsar client config: {
  "serviceUrl" : "pulsar://bio-qa-demo.ktc-onelab.lcl:6650",
2021-06-03 12:42:29.661  INFO 12076 --- [pulsar-client-io-16-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [null] Creating producer on cnx [id: 0xb64f1099, L:/172.19.5.106:59320 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650]
2021-06-03 12:42:29.776  INFO 12076 --- [pulsar-client-io-16-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [databus-75-1432] Created producer on cnx [id: 0xb64f1099, L:/172.19.5.106:59320 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650]
2021-06-03 12:42:30.002  INFO 12076 --- [pulsar-client-io-16-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [databus-75-1432] Closed Producer
2021-06-03 12:43:01.166  INFO 12076 --- [pulsar-client-io-16-1] o.a.p.c.impl.ProducerStatsRecorderImpl   : Starting Pulsar producer perf with config: {
2021-06-03 12:43:01.168  INFO 12076 --- [pulsar-client-io-16-1] o.a.p.c.impl.ProducerStatsRecorderImpl   : Pulsar client config: {
  "serviceUrl" : "pulsar://bio-qa-demo.ktc-onelab.lcl:6650",
2021-06-03 12:43:01.286  INFO 12076 --- [pulsar-client-io-16-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [null] Creating producer on cnx [id: 0xb64f1099, L:/172.19.5.106:59320 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650]
2021-06-03 12:43:01.401  INFO 12076 --- [pulsar-client-io-16-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [databus-75-1433] Created producer on cnx [id: 0xb64f1099, L:/172.19.5.106:59320 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650]
2021-06-03 12:43:01.627  INFO 12076 --- [pulsar-client-io-16-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [databus-75-1433] Closed Producer
2021-06-03 12:43:09.316  INFO 12076 --- [pulsar-timer-15-1] o.a.p.c.impl.ConsumerStatsRecorderImpl   : [persistent://dbus/test/dummy-objects] [reader-bda41b7606] [11266] Prefetched messages: 0 --- Consume throughput received: 0,03 msgs/s --- 0,00 Mbit/s --- Ack sent rate: 0,00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
2021-06-03 12:43:32.866  INFO 12076 --- [pulsar-client-io-16-1] o.a.p.c.impl.ProducerStatsRecorderImpl   : Starting Pulsar producer perf with config: {
2021-06-03 12:43:32.868  INFO 12076 --- [pulsar-client-io-16-1] o.a.p.c.impl.ProducerStatsRecorderImpl   : Pulsar client config: {
  "serviceUrl" : "pulsar://bio-qa-demo.ktc-onelab.lcl:6650",
2021-06-03 12:43:32.980  INFO 12076 --- [pulsar-client-io-16-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [null] Creating producer on cnx [id: 0xb64f1099, L:/172.19.5.106:59320 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650]
2021-06-03 12:43:33.095  INFO 12076 --- [pulsar-client-io-16-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [databus-75-1434] Created producer on cnx [id: 0xb64f1099, L:/172.19.5.106:59320 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650]
2021-06-03 12:43:33.329  INFO 12076 --- [pulsar-client-io-16-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [databus-75-1434] Closed Producer
2021-06-03 12:44:04.632  INFO 12076 --- [pulsar-client-io-16-1] o.a.p.c.impl.ProducerStatsRecorderImpl   : Starting Pulsar producer perf with config: {
2021-06-03 12:44:04.636  INFO 12076 --- [pulsar-client-io-16-1] o.a.p.c.impl.ProducerStatsRecorderImpl   : Pulsar client config: {
  "serviceUrl" : "pulsar://bio-qa-demo.ktc-onelab.lcl:6650",
2021-06-03 12:44:04.751  INFO 12076 --- [pulsar-client-io-16-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [null] Creating producer on cnx [id: 0xb64f1099, L:/172.19.5.106:59320 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650]
2021-06-03 12:44:04.867  INFO 12076 --- [pulsar-client-io-16-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [databus-75-1435] Created producer on cnx [id: 0xb64f1099, L:/172.19.5.106:59320 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650]
2021-06-03 12:44:05.103  INFO 12076 --- [pulsar-client-io-16-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [databus-75-1435] Closed Producer
2021-06-03 12:44:09.318  INFO 12076 --- [pulsar-timer-15-1] o.a.p.c.impl.ConsumerStatsRecorderImpl   : [persistent://dbus/test/dummy-objects] [reader-bda41b7606] [11266] Prefetched messages: 0 --- Consume throughput received: 0,03 msgs/s --- 0,00 Mbit/s --- Ack sent rate: 0,00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
2021-06-03 12:44:36.616  INFO 12076 --- [pulsar-client-io-16-1] o.a.p.c.impl.ProducerStatsRecorderImpl   : Starting Pulsar producer perf with config: {
2021-06-03 12:44:36.618  INFO 12076 --- [pulsar-client-io-16-1] o.a.p.c.impl.ProducerStatsRecorderImpl   : Pulsar client config: {
  "serviceUrl" : "pulsar://bio-qa-demo.ktc-onelab.lcl:6650",
2021-06-03 12:44:36.732  INFO 12076 --- [pulsar-client-io-16-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [null] Creating producer on cnx [id: 0xb64f1099, L:/172.19.5.106:59320 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650]
2021-06-03 12:44:36.849  INFO 12076 --- [pulsar-client-io-16-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [databus-75-1440] Created producer on cnx [id: 0xb64f1099, L:/172.19.5.106:59320 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650]
2021-06-03 12:44:37.079  INFO 12076 --- [pulsar-client-io-16-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [databus-75-1440] Closed Producer
2021-06-03 12:45:08.346  INFO 12076 --- [pulsar-client-io-16-1] o.a.p.c.impl.ProducerStatsRecorderImpl   : Starting Pulsar producer perf with config: {
2021-06-03 12:45:08.353  INFO 12076 --- [pulsar-client-io-16-1] o.a.p.c.impl.ProducerStatsRecorderImpl   : Pulsar client config: {
  "serviceUrl" : "pulsar://bio-qa-demo.ktc-onelab.lcl:6650",
2021-06-03 12:45:08.465  INFO 12076 --- [pulsar-client-io-16-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [null] Creating producer on cnx [id: 0xb64f1099, L:/172.19.5.106:59320 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650]
2021-06-03 12:45:08.580  INFO 12076 --- [pulsar-client-io-16-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [databus-75-1441] Created producer on cnx [id: 0xb64f1099, L:/172.19.5.106:59320 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650]
2021-06-03 12:45:08.804  INFO 12076 --- [pulsar-client-io-16-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [databus-75-1441] Closed Producer
2021-06-03 12:45:09.321  INFO 12076 --- [pulsar-timer-15-1] o.a.p.c.impl.ConsumerStatsRecorderImpl   : [persistent://dbus/test/dummy-objects] [reader-bda41b7606] [11266] Prefetched messages: 0 --- Consume throughput received: 0,03 msgs/s --- 0,00 Mbit/s --- Ack sent rate: 0,00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
2021-06-03 12:45:40.071  INFO 12076 --- [pulsar-client-io-16-1] o.a.p.c.impl.ProducerStatsRecorderImpl   : Starting Pulsar producer perf with config: {
2021-06-03 12:45:40.073  INFO 12076 --- [pulsar-client-io-16-1] o.a.p.c.impl.ProducerStatsRecorderImpl   : Pulsar client config: {
  "serviceUrl" : "pulsar://bio-qa-demo.ktc-onelab.lcl:6650",
2021-06-03 12:45:40.185  INFO 12076 --- [pulsar-client-io-16-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [null] Creating producer on cnx [id: 0xb64f1099, L:/172.19.5.106:59320 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650]
2021-06-03 12:45:40.301  INFO 12076 --- [pulsar-client-io-16-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [databus-75-1442] Created producer on cnx [id: 0xb64f1099, L:/172.19.5.106:59320 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650]
2021-06-03 12:45:40.524  INFO 12076 --- [pulsar-client-io-16-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [databus-75-1442] Closed Producer
2021-06-03 12:45:59.301  INFO 12076 --- [clientInboundChannel-35] k.o.d.a.s.RefreshableCredentialsSupplier : Credentials refreshed: {old: {clientId: etx, tokenHash: 5b90493918766fa0b23fcc63aa93455b, exp: **2021-06-03T10:46:56Z**, life: PT56.699S}, new: {clientId: etx, tokenHash: 92730944006c9c39aa8ff5026c7c4090, exp: 2021-06-03T10:50:59Z, life: PT4M59.699S}}
2021-06-03 12:45:59.302  INFO 12076 --- [clientInboundChannel-35] k.o.d.a.s.RefreshableCredentialsSupplier : Supplier deleted: {lastSupply: {moment: 2021-06-03T10:43:38.762Z, creds: {clientId: etx, tokenHash: 5b90493918766fa0b23fcc63aa93455b, exp: **2021-06-03T10:46:56Z**, life: PT56.699S}}, curCreds: {clientId: etx, tokenHash: 92730944006c9c39aa8ff5026c7c4090, exp: 2021-06-03T10:50:59Z, life: PT4M59.698S}, prevCreds: {clientId: etx, tokenHash: 5b90493918766fa0b23fcc63aa93455b, exp: **2021-06-03T10:46:56Z**, life: PT56.698S}}
2021-06-03 12:46:09.322  INFO 12076 --- [pulsar-timer-15-1] o.a.p.c.impl.ConsumerStatsRecorderImpl   : [persistent://dbus/test/dummy-objects] [reader-bda41b7606] [11266] Prefetched messages: 0 --- Consume throughput received: 0,02 msgs/s --- 0,00 Mbit/s --- Ack sent rate: 0,00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
2021-06-03 12:46:11.809  INFO 12076 --- [pulsar-client-io-16-1] o.a.p.c.impl.ProducerStatsRecorderImpl   : Starting Pulsar producer perf with config: {
2021-06-03 12:46:11.811  INFO 12076 --- [pulsar-client-io-16-1] o.a.p.c.impl.ProducerStatsRecorderImpl   : Pulsar client config: {
  "serviceUrl" : "pulsar://bio-qa-demo.ktc-onelab.lcl:6650",
2021-06-03 12:46:11.923  INFO 12076 --- [pulsar-client-io-16-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [null] Creating producer on cnx [id: 0xb64f1099, L:/172.19.5.106:59320 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650]
2021-06-03 12:46:12.042  INFO 12076 --- [pulsar-client-io-16-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [databus-75-1443] Created producer on cnx [id: 0xb64f1099, L:/172.19.5.106:59320 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650]
2021-06-03 12:46:12.274  INFO 12076 --- [pulsar-client-io-16-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [databus-75-1443] Closed Producer
2021-06-03 12:46:43.532  INFO 12076 --- [pulsar-client-io-16-1] o.a.p.c.impl.ProducerStatsRecorderImpl   : Starting Pulsar producer perf with config: {
2021-06-03 12:46:43.535  INFO 12076 --- [pulsar-client-io-16-1] o.a.p.c.impl.ProducerStatsRecorderImpl   : Pulsar client config: {
  "serviceUrl" : "pulsar://bio-qa-demo.ktc-onelab.lcl:6650",
2021-06-03 12:46:43.647  INFO 12076 --- [pulsar-client-io-16-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [null] Creating producer on cnx [id: 0xb64f1099, L:/172.19.5.106:59320 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650]
2021-06-03 12:46:43.761  INFO 12076 --- [pulsar-client-io-16-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [databus-75-1448] Created producer on cnx [id: 0xb64f1099, L:/172.19.5.106:59320 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650]
2021-06-03 12:46:43.991  INFO 12076 --- [pulsar-client-io-16-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [databus-75-1448] Closed Producer
2021-06-03 12:47:09.324  INFO 12076 --- [pulsar-timer-15-1] o.a.p.c.impl.ConsumerStatsRecorderImpl   : [persistent://dbus/test/dummy-objects] [reader-bda41b7606] [11266] Prefetched messages: 0 --- Consume throughput received: 0,03 msgs/s --- 0,00 Mbit/s --- Ack sent rate: 0,00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
2021-06-03 12:47:15.302  INFO 12076 --- [pulsar-client-io-16-1] o.a.p.c.impl.ProducerStatsRecorderImpl   : Starting Pulsar producer perf with config: {
2021-06-03 12:47:15.305  INFO 12076 --- [pulsar-client-io-16-1] o.a.p.c.impl.ProducerStatsRecorderImpl   : Pulsar client config: {
  "serviceUrl" : "pulsar://bio-qa-demo.ktc-onelab.lcl:6650",
2021-06-03 12:47:15.419  INFO 12076 --- [pulsar-client-io-16-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [null] Creating producer on cnx [id: 0xb64f1099, L:/172.19.5.106:59320 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650]
2021-06-03 12:47:15.537  INFO 12076 --- [pulsar-client-io-16-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [databus-75-1449] Created producer on cnx [id: 0xb64f1099, L:/172.19.5.106:59320 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650]
2021-06-03 12:47:15.773  INFO 12076 --- [pulsar-client-io-16-1] o.a.pulsar.client.impl.ProducerImpl      : [persistent://dbus/test/dummy-objects] [databus-75-1449] Closed Producer
2021-06-03 12:47:38.882  WARN 12076 --- [pulsar-client-io-16-1] org.apache.pulsar.client.impl.ClientCnx  : [id: 0xb64f1099, L:/172.19.5.106:59320 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650] Received error from server: Failed to authentication token: JWT expired at **2021-06-03T10:46:56Z**. Current time: 2021-06-03T10:47:38Z, a difference of 42823 milliseconds.  Allowed clock skew: 0 milliseconds.
2021-06-03 12:47:38.883 ERROR 12076 --- [pulsar-client-io-16-1] org.apache.pulsar.client.impl.ClientCnx  : [id: 0xb64f1099, L:/172.19.5.106:59320 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650] Failed to authenticate the client
2021-06-03 12:47:38.883  WARN 12076 --- [pulsar-client-io-16-1] org.apache.pulsar.client.impl.ClientCnx  : [id: 0xb64f1099, L:/172.19.5.106:59320 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650] Received unknown request id from server: -1
2021-06-03 12:47:38.883  INFO 12076 --- [pulsar-client-io-16-1] org.apache.pulsar.client.impl.ClientCnx  : [id: 0xb64f1099, L:/172.19.5.106:59320 ! R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650] Disconnected
2021-06-03 12:47:46.913  WARN 12076 --- [pulsar-client-io-16-1] org.apache.pulsar.client.impl.ClientCnx  : [id: 0x570ca8a3, L:/172.19.5.106:59319 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650] Received error from server: Disconnected from server at broker1/10.0.4.44:6650
2021-06-03 12:47:46.919  WARN 12076 --- [pulsar-client-io-16-1] o.a.p.c.impl.BinaryProtoLookupService    : [persistent://dbus/test/dummy-objects] failed to get schema : org.apache.pulsar.client.api.PulsarClientException$LookupException: Disconnected from server at broker1/10.0.4.44:6650
java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException$LookupException: Disconnected from server at broker1/10.0.4.44:6650
	at org.apache.pulsar.client.impl.ClientCnx.handleError(ClientCnx.java:655) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:197) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[pulsar-client-2.7.2.jar:2.7.2]
Caused by: org.apache.pulsar.client.api.PulsarClientException$LookupException: Disconnected from server at broker1/10.0.4.44:6650
	at org.apache.pulsar.client.impl.ClientCnx.getPulsarClientException(ClientCnx.java:1033) ~[pulsar-client-2.7.2.jar:2.7.2]
2021-06-03 12:47:47.048  WARN 12076 --- [pulsar-client-io-16-1] org.apache.pulsar.client.impl.ClientCnx  : [id: 0x570ca8a3, L:/172.19.5.106:59319 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650] Received error from server: Disconnected from server at broker2/10.0.4.45:6650
2021-06-03 12:47:47.048  WARN 12076 --- [pulsar-client-io-16-1] o.a.p.c.impl.BinaryProtoLookupService    : [persistent://dbus/test/dummy-objects] failed to get schema : org.apache.pulsar.client.api.PulsarClientException$LookupException: Disconnected from server at broker2/10.0.4.45:6650
java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException$LookupException: Disconnected from server at broker2/10.0.4.45:6650
	at org.apache.pulsar.client.impl.ClientCnx.handleError(ClientCnx.java:655) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:197) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[pulsar-client-2.7.2.jar:2.7.2]
Caused by: org.apache.pulsar.client.api.PulsarClientException$LookupException: Disconnected from server at broker2/10.0.4.45:6650
	at org.apache.pulsar.client.impl.ClientCnx.getPulsarClientException(ClientCnx.java:1033) ~[pulsar-client-2.7.2.jar:2.7.2]
2021-06-03 12:47:47.170  WARN 12076 --- [pulsar-client-io-16-1] org.apache.pulsar.client.impl.ClientCnx  : [id: 0x570ca8a3, L:/172.19.5.106:59319 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650] Received error from server: Disconnected from server at broker3/10.0.4.46:6650
2021-06-03 12:47:47.172  WARN 12076 --- [pulsar-client-io-16-1] o.a.p.c.impl.BinaryProtoLookupService    : [persistent://dbus/test/dummy-objects] failed to get schema : org.apache.pulsar.client.api.PulsarClientException$LookupException: Disconnected from server at broker3/10.0.4.46:6650
java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException$LookupException: Disconnected from server at broker3/10.0.4.46:6650
	at org.apache.pulsar.client.impl.ClientCnx.handleError(ClientCnx.java:655) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:197) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[pulsar-client-2.7.2.jar:2.7.2]
	at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[pulsar-client-2.7.2.jar:2.7.2]
Caused by: org.apache.pulsar.client.api.PulsarClientException$LookupException: Disconnected from server at broker3/10.0.4.46:6650
	at org.apache.pulsar.client.impl.ClientCnx.getPulsarClientException(ClientCnx.java:1033) ~[pulsar-client-2.7.2.jar:2.7.2]
    CAUSE   >>>> org.apache.pulsar.client.api.PulsarClientException$LookupException: Disconnected from server at broker3/10.0.4.46:6650
2021-06-03 12:47:47.373  INFO 12076 --- [pulsar-client-io-12-1] o.a.pulsar.client.impl.ConsumerImpl      : [persistent://dbus/test/dummy-objects] [reader-bda41b7606] Closed consumer
2021-06-03 12:50:51.503  INFO 12076 --- [MessageBroker-8] k.o.d.a.s.RefreshableCredentialsSupplier : Supplier deleted: {lastSupply: {moment: 2021-06-03T10:47:38.760Z, creds: {clientId: etx, tokenHash: 92730944006c9c39aa8ff5026c7c4090, exp: 2021-06-03T10:50:59Z, life: PT7.497S}}, curCreds: {clientId: etx, tokenHash: 92730944006c9c39aa8ff5026c7c4090, exp: 2021-06-03T10:50:59Z, life: PT7.497S}, prevCreds: {clientId: etx, tokenHash: 5b90493918766fa0b23fcc63aa93455b, exp: **2021-06-03T10:46:56Z**, life: PT-3M-55.503S}}
2021-06-03 12:50:51.503  INFO 12076 --- [MessageBroker-8] o.a.pulsar.client.impl.PulsarClientImpl  : Client closing. URL: pulsar://bio-qa-demo.ktc-onelab.lcl:6650
2021-06-03 12:50:51.505  INFO 12076 --- [pulsar-client-io-16-1] org.apache.pulsar.client.impl.ClientCnx  : [id: 0x570ca8a3, L:/172.19.5.106:59319 ! R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650] Disconnected
2021-06-03 12:51:38.882  WARN 12076 --- [pulsar-client-io-12-1] org.apache.pulsar.client.impl.ClientCnx  : [id: 0xb5aa0fd1, L:/172.19.5.106:62138 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650] Received error from server: Failed to authentication token: JWT expired at 2021-06-03T10:50:59Z. Current time: 2021-06-03T10:51:38Z, a difference of 39827 milliseconds.  Allowed clock skew: 0 milliseconds.
2021-06-03 12:51:38.884 ERROR 12076 --- [pulsar-client-io-12-1] org.apache.pulsar.client.impl.ClientCnx  : [id: 0xb5aa0fd1, L:/172.19.5.106:62138 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650] Failed to authenticate the client
2021-06-03 12:51:38.885  WARN 12076 --- [pulsar-client-io-12-1] org.apache.pulsar.client.impl.ClientCnx  : [id: 0xb5aa0fd1, L:/172.19.5.106:62138 - R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650] Received unknown request id from server: -1
2021-06-03 12:51:38.885  INFO 12076 --- [pulsar-client-io-12-1] org.apache.pulsar.client.impl.ClientCnx  : [id: 0xb5aa0fd1, L:/172.19.5.106:62138 ! R:bio-qa-demo.ktc-onelab.lcl/172.24.9.126:6650] Disconnected

If we look at corresponding broker logs (broker3) we see that connection is being stablished using an expired token (at a time AFTER the refresh):

12:47:47.109 [pulsar-io-23-8] INFO  org.apache.pulsar.broker.service.ServerCnx - New connection from /10.0.4.50:41828
12:47:47.114 [pulsar-io-23-8] ERROR org.apache.pulsar.broker.authentication.AuthenticationProviderList - Failed to initialize a new auth state from /10.0.4.50:41828
javax.naming.AuthenticationException: Failed to authentication token: JWT expired at **2021-06-03T10:46:56Z**. Current time: 2021-06-03T10:47:47Z, a difference of 51114 milliseconds.  Allowed clock skew: 0 milliseconds.
  at org.apache.pulsar.broker.authentication.AuthenticationProviderToken.authenticateToken(AuthenticationProviderToken.java:223) ~[org.apache.pulsar-pulsar-broker-common-2.7.2.jar:2.7.2]
  at org.apache.pulsar.broker.authentication.AuthenticationProviderToken.access$100(AuthenticationProviderToken.java:48) ~[org.apache.pulsar-pulsar-broker-common-2.7.2.jar:2.7.2]
  at org.apache.pulsar.broker.authentication.AuthenticationProviderToken$TokenAuthenticationState.authenticate(AuthenticationProviderToken.java:321) ~[org.apache.pulsar-pulsar-broker-common-2.7.2.jar:2.7.2]
  at org.apache.pulsar.broker.authentication.AuthenticationProviderToken$TokenAuthenticationState.<init>(AuthenticationProviderToken.java:309) ~[org.apache.pulsar-pulsar-broker-common-2.7.2.jar:2.7.2]
  at org.apache.pulsar.broker.authentication.AuthenticationProviderToken.newAuthState(AuthenticationProviderToken.java:155) ~[org.apache.pulsar-pulsar-broker-common-2.7.2.jar:2.7.2]
  at org.apache.pulsar.broker.authentication.AuthenticationProviderList.lambda$newAuthState$1(AuthenticationProviderList.java:173) ~[org.apache.pulsar-pulsar-broker-common-2.7.2.jar:2.7.2]
  at org.apache.pulsar.broker.authentication.AuthenticationProviderList.applyAuthProcessor(AuthenticationProviderList.java:50) ~[org.apache.pulsar-pulsar-broker-common-2.7.2.jar:2.7.2]
  at org.apache.pulsar.broker.authentication.AuthenticationProviderList.newAuthState(AuthenticationProviderList.java:170) [org.apache.pulsar-pulsar-broker-common-2.7.2.jar:2.7.2]
  at org.apache.pulsar.broker.service.ServerCnx.handleConnect(ServerCnx.java:754) [org.apache.pulsar-pulsar-broker-2.7.2.jar:2.7.2]
  at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:185) [org.apache.pulsar-pulsar-common-2.7.2.jar:2.7.2]
  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.60.Final.jar:4.1.60.Final]
  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.60.Final.jar:4.1.60.Final]
  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [io.netty-netty-transport-4.1.60.Final.jar:4.1.60.Final]
  at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200) [io.netty-netty-handler-4.1.60.Final.jar:4.1.60.Final]
  at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162) [io.netty-netty-handler-4.1.60.Final.jar:4.1.60.Final]
  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.60.Final.jar:4.1.60.Final]
  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.60.Final.jar:4.1.60.Final]
  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [io.netty-netty-transport-4.1.60.Final.jar:4.1.60.Final]
  at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) [io.netty-netty-codec-4.1.60.Final.jar:4.1.60.Final]
  at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) [io.netty-netty-codec-4.1.60.Final.jar:4.1.60.Final]
  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.60.Final.jar:4.1.60.Final]
  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.60.Final.jar:4.1.60.Final]
  at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [io.netty-netty-transport-4.1.60.Final.jar:4.1.60.Final]
  at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) [io.netty-netty-transport-4.1.60.Final.jar:4.1.60.Final]
  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [io.netty-netty-transport-4.1.60.Final.jar:4.1.60.Final]
  at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [io.netty-netty-transport-4.1.60.Final.jar:4.1.60.Final]
  at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) [io.netty-netty-transport-4.1.60.Final.jar:4.1.60.Final]
  at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:795) [io.netty-netty-transport-native-epoll-4.1.60.Final-linux-x86_64.jar:4.1.60.Final]
  at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480) [io.netty-netty-transport-native-epoll-4.1.60.Final-linux-x86_64.jar:4.1.60.Final]
  at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378) [io.netty-netty-transport-native-epoll-4.1.60.Final-linux-x86_64.jar:4.1.60.Final]
  at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) [io.netty-netty-common-4.1.60.Final.jar:4.1.60.Final]
  at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [io.netty-netty-common-4.1.60.Final.jar:4.1.60.Final]
  at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.60.Final.jar:4.1.60.Final]
  at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]
12:47:47.115 [pulsar-io-23-8] WARN  org.apache.pulsar.broker.service.ServerCnx - [/10.0.4.50:41828] Unable to authenticate: Failed to authentication token: JWT expired at **2021-06-03T10:46:56Z**. Current time: 2021-06-03T10:47:47Z, a difference of 51114 milliseconds.  Allowed clock skew: 0 milliseconds.
12:47:47.115 [pulsar-io-23-8] INFO  org.apache.pulsar.broker.service.ServerCnx - Closed connection from /10.0.4.50:41828

Notice the time 2021-06-03T10:46:56Z that is the expiration time of the second token. First refresh worked OK as broker did not fail past first 5 minutes. Error occurs on broker at 12:47:47.109 reporting an expired token at 2021-06-03T10:46:56Z. However that token had already been refreshed one minute before at 2021-06-03 12:45:59.301 (local time is CEST or GMT+2):

2021-06-03 12:45:59.301  INFO 12076 --- [clientInboundChannel-35] k.o.d.a.s.RefreshableCredentialsSupplier : Credentials refreshed: {old: {clientId: etx, tokenHash: 5b90493918766fa0b23fcc63aa93455b, exp: **2021-06-03T10:46:56Z**, life: PT56.699S}, new: {clientId: etx, tokenHash: 92730944006c9c39aa8ff5026c7c4090, exp: 2021-06-03T10:50:59Z, life: PT4M59.699S}}

After that moment following producers were created at following moments (using third token) 2021-06-03 12:46:11.809, 2021-06-03 12:46:43.532, 2021-06-03 12:47:15.302. However at 2021-06-03 12:47:38.882 client reported connection failed (using second token) for error occurring on broker at 12:47:47.109.

Expected behavior

All previous tokens belonging to same client must be refreshed at proxy connection pool once they are obtained through connections from client.

This is a blocker issue that prevents us from using Pulsar Proxy on our clusters.

@fmiguelez fmiguelez added the type/bug The PR fixed a bug or issue reported a bug label Jun 3, 2021
@fmiguelez
Copy link
Contributor Author

We have tested this on 2.7.0, 2.7.1 and 2.7.2

@eolivelli
Copy link
Contributor

Nice description and proposal
Would you have time to contribute an implementation?

@fmiguelez
Copy link
Contributor Author

@eolivelli I tried looking for exact piece of code where this happens but I find code a little bit difficult for me to follow (at least on github). Probably I can find a solution with some help from someone pointing me in the right direction.

@codelipenghui
Copy link
Contributor

The issue had no activity for 30 days, mark with Stale label.

@gengmao
Copy link

gengmao commented Dec 14, 2022

@nodece my understanding is this issue was not fully fixed with #17517 and #17831. Is it still pending on #18130?

@nodece
Copy link
Member

nodece commented Dec 15, 2022

Is it still pending on #18130?

@gengmao Right.

@lhotari
Copy link
Member

lhotari commented Dec 20, 2022

Is it still pending on #18130?

@gengmao Right.

@nodece @codelipenghui I think that there's a major gap with the solution in #17517, #17831 or #18130 if the intention is to fix token refresh with the Pulsar Proxy.

One detail of the proxy is that after the proxy has connected, it switches to a mode where plain bytes are proxied without decoding the protocol messages.

switch (state) {
case Init:
case Connecting:
case ProxyLookupRequests:
// Do the regular decoding for the Connected message
super.channelRead(ctx, msg);
break;
case ProxyConnectionToBroker:
if (directProxyHandler != null) {
ProxyService.OPS_COUNTER.inc();
if (msg instanceof ByteBuf) {
int bytes = ((ByteBuf) msg).readableBytes();
directProxyHandler.getInboundChannelRequestsRate().recordEvent(bytes);
ProxyService.BYTES_COUNTER.inc(bytes);
}
directProxyHandler.outboundChannel.writeAndFlush(msg)
.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
if (service.proxyZeroCopyModeEnabled && service.proxyLogLevel == 0) {
if (!directProxyHandler.isTlsOutboundChannel && !isTlsInboundChannel) {
spliceNIC2NIC((EpollSocketChannel) ctx.channel(),
(EpollSocketChannel) directProxyHandler.outboundChannel, SPLICE_BYTES)
.addListener(future -> {
ProxyService.OPS_COUNTER.inc();
ProxyService.BYTES_COUNTER.inc(SPLICE_BYTES);
directProxyHandler.getInboundChannelRequestsRate().recordEvent(SPLICE_BYTES);
});
}
}
} else {
LOG.warn("Received message of type {} while connection to broker is missing in state {}. "
+ "Dropping the input message (readable bytes={}).", msg.getClass(), state,
msg instanceof ByteBuf ? ((ByteBuf) msg).readableBytes() : -1);
}
break;

This is where the frameDecoder is removed:

// direct tcp proxy
inboundChannel.pipeline().remove("frameDecoder");
outboundChannel.pipeline().remove("frameDecoder");

There's no information when a Pulsar binary protocol message starts and ends, and it's not possible to intercept or inject any messages between the client and the broker in the current solution. If any messages are injected, there's a chance that it corrupts the traffic since the message might get injected in a middle of another message since the proxy isn't doing any framing.

Solving this issue will be tricky to solve (while keeping the same performance characteristics). It needs more design discussions and documenting the design.

It feels that there's no real need to inject traffic in the proxy or do any special token refresh. The proxy stops authenticating and simply passes the bytes back and forth after the connection has been established.

@lhotari lhotari reopened this Dec 20, 2022
@lhotari
Copy link
Member

lhotari commented Dec 20, 2022

Older changes #1707 and #1169 could provide more context.

@nodece
Copy link
Member

nodece commented Dec 20, 2022

Thank @lhotari for your explanation.

One detail of the proxy is that after the proxy has connected, it switches to a mode where plain bytes are proxied without decoding the protocol messages.

I know this, so the proxy authentication data should not contain an expiration time, and then just refresh the client authentication data, and we also should notice we have a client on the proxy, which is used to lookup the topic URL and metadata, sometimes the client is ignored.

#17831 didn't refresh the proxy's client authentication data, here should be the user's authentication data.

The proxy's client passes two authentication data, one from the self, and one from the actual user.

#18130 fixes a bug that updates the authentication variable, this should be an important fix.

@michaeljmarshall
Copy link
Member

I agree with @lhotari that the solution in #17831 will lead to corruption.

To take a step back, I think we need to clarify what the actual issue. Once the client's connection through the proxy is established with the broker, the broker's auth refresh challenge protocol takes over, assuming everything is configured correctly.

What appears to be the problem is that the clientAuthData in the ProxyConnection is never updated. That means the clientCnxSupplier will have stale data when making connections to new brokers. I'll try to put together a PR to show a solution.

@michaeljmarshall
Copy link
Member

I agree with @lhotari that the solution in #17831 will lead to corruption.

I am not sure that I think this any more.

The only connections that appear to be the problem are the ones with a ProxyLookupRequests state. Notably, #17831 only applies to these connections:

if (service.getConfiguration().isForwardAuthorizationCredentials()
&& connectionPool != null && state == State.ProxyLookupRequests) {

@michaeljmarshall
Copy link
Member

I think there is still an additional issue we'll need to solve. I need to sign off now, but I have a partial fix and will continue working on it in a few hours.

@michaeljmarshall
Copy link
Member

One problem I see is that the auth data is hard coded:

clientCnxSupplier = () -> new ProxyClientCnx(clientConf, service.getWorkerGroup(), clientAuthRole,
clientAuthData, clientAuthMethod, protocolVersionToAdvertise,
service.getConfiguration().isForwardAuthorizationCredentials(), this);

As a result, when a client connects for a lookup with an existing client/proxy connection but triggers a new proxy/broker connection, the auth data may be expired.

I have a draft solution for this issue here: #19026. I am still working on a test.

@lhotari
Copy link
Member

lhotari commented Dec 22, 2022

I know this, so the proxy authentication data should not contain an expiration time, and then just refresh the client authentication data, and we also should notice we have a client on the proxy, which is used to lookup the topic URL and metadata, sometimes the client is ignored.

@nodece
When the proxy switches to the mode where plain bytes are proxies, the proxy should never use the authentication data anymore. This is why it's unnecessary to keep it up-to-date after the proxy switches the connection to proxying mode.

What would be helpful is to describe the scenario and sketch sequence diagrams of the scenario where refreshing of the authentication tokens would be needed.

We don't currently have a sufficient description of the problem so that there would be a reasonable solution to the problem.

The connection pool in the proxy is bound to the connection from the client to the proxy.
On the client side, the connection from the client to the proxy is keyed by the "logical address" and the "physical address".
Java client:

/**
* Get a connection from the pool.
* <p>
* The connection can either be created or be coming from the pool itself.
* <p>
* When specifying multiple addresses, the logicalAddress is used as a tag for the broker, while the physicalAddress
* is where the connection is actually happening.
* <p>
* These two addresses can be different when the client is forced to connect through a proxy layer. Essentially, the
* pool is using the logical address as a way to decide whether to reuse a particular connection.
*
* @param logicalAddress
* the address to use as the broker tag
* @param physicalAddress
* the real address where the TCP connection should be made
* @return a future that will produce the ClientCnx object
*/
public CompletableFuture<ClientCnx> getConnection(InetSocketAddress logicalAddress,
InetSocketAddress physicalAddress) {

Pulsar Go client:
https://github.com/apache/pulsar-client-go/blob/d92fb1407d3d39c8a498dd7c7abdc0bbb3fc7e1a/pulsar/internal/connection_pool.go#L75-L76
Pulsar C++ client:
https://github.com/apache/pulsar-client-cpp/blob/fa3ac76eddb08c3eb9d865332214af5aa5a5fe88/lib/ConnectionPool.cc#L64-L65

Because of this reason, the connection pool held within the ProxyConnection should contain only a single connection to do lookups.

// ConnectionPool is used by the proxy to issue lookup requests
private ConnectionPool connectionPool;

I wonder if we could come up with a better description of the problem and the solution.

@nodece
Copy link
Member

nodece commented Dec 23, 2022

image

I fixed the lookup action by #17831.

When the proxy switches to the mode where plain bytes are proxies, the proxy should never use the authentication data anymore. This is why it's unnecessary to keep it up-to-date after the proxy switches the connection to proxying mode.

You're referring to the final process, not lookup process, there are two connections between connect to broker and lookup.

@lhotari
Copy link
Member

lhotari commented Jan 11, 2023

I fixed the lookup action by #17831.

@nodece Please add the proper problem description to PR #17831. It will be easier to review the PR when there's a clear description of the context.

@lhotari
Copy link
Member

lhotari commented Jan 11, 2023

@nodece since #17831 is already merged, I guess PR #18130 is where there should be proper problem description. Please improve it.

@lhotari
Copy link
Member

lhotari commented Jan 11, 2023

You're referring to the final process, not lookup process, there are two connections between connect to broker and lookup.

@nodece I'm not exactly sure what you mean with this comment.

The ProxyConnection is one of these states:

  enum State {
        Init,

        // Connecting between user client and proxy server.
        // Mutual authn needs verify between client and proxy server several times.
        Connecting,

        // Proxy the lookup requests to a random broker
        // Follow redirects
        ProxyLookupRequests,

        // Connecting to the broker
        ProxyConnectingToBroker,

        // If we are proxying a connection to a specific broker, we
        // are just forwarding data between the 2 connections, without
        // looking into it
        ProxyConnectionToBroker,

        Closing,

        Closed,
    }

Yes, there's a separate connection for lookups and after the state goes to ProxyConnectingToBroker/ProxyConnectionToBroker, the connection(s) used for lookups, will no longer be needed.

This is what I meant with my comment "When the proxy switches to the mode where plain bytes are proxies, the proxy should never use the authentication data anymore. This is why it's unnecessary to keep it up-to-date after the proxy switches the connection to proxying mode."

I see that you know this, but my point is that it's not described in the PR descriptions.
It's really hard to guess what the intention is if there's no code comments or description in the PR of the intent.

For example, iterating through the connections in the connection pool (used for lookup connections) doesn't seem like the correct approach at all: https://github.com/apache/pulsar/pull/17831/files#diff-bbca5aac9dede7618187e91b91ffd0c6c8ffb836cd79ff2d104439e8cf5fc0daR545-R561

Perhaps this works for most cases where there's a single connection in the connection pool. What if there are multiple connections? It would also be helpful to add integration tests where this would happen.

@nodece
Copy link
Member

nodece commented Jan 12, 2023

Thank you for your detailed explanation and guidance.

Perhaps this works for most cases where there's a single connection in the connection pool. What if there are multiple connections? It would also be helpful to add integration tests where this would happen.

This scenario would be one client connecting different brokers by proxy, which means they use the same authentication data on the broker, and I send the new authentication data to each broker, maybe there is a performance issue here.

@michaeljmarshall
Copy link
Member

I identified an addition issue with #17831. Specifically, we go to Connecting state when the broker challenges the client's auth. That is a problem because it could lead to the client's connection getting closed by the proxy if the proxy receives an "unexpected" protocol message.

Here is my comment on the PR:
#17831 (comment)

michaeljmarshall added a commit that referenced this issue Apr 11, 2023
Fixes: #10816
PIP: #19771
Supersedes: #19026
Depends on: #20062

### Motivation

The Pulsar Proxy does not properly handle authentication data refresh when in state `ProxyLookupRequests`. The consequence is described in #10816. Essentially, the problem is that the proxy caches stale authentication data and sends it to the broker leading to connection failures.

#17831 attempted to fix the underlying problem, but it missed an important edge cases. Specifically, it missed the case that the `ConnectionPool` will have multiple connections when a lookup gets redirected. As such, the following problem exists (and is fixed by this PR):

1. Client opens connection to perform lookups.
2. Proxy connects to broker 1 to get the topic ownership info.
3. Time passes.
4. Client does an additional lookup, and this topic is on a newly created broker 2. In this case, the proxy opens a new connection with the stale client auth data.
5. Broker 2 rejects the connection because it fails with expired authentication.

### Modifications

* Remove some of the implementation from #17831. This new implementation still allows a broker to challenge the client through the proxy, but notably, it limits the number of challenges sent to the client. Further, the proxy does not challenge the client when the auth data is not expired.
* Introduce authentication refresh in the proxy so that the proxy challenges the client any time the auth data is expired.
* Update the `ProxyClientCnx` to get the `clientAuthData` from the `ProxyConnection` to ensure that it gets new authentication data.
* Add clock skew to the `AuthenticationProviderToken`. This is necessary to make some of the testing not flaky and it will also be necessary for users to configure in their clusters.

### Verifying this change

The `ProxyRefreshAuthTest` covers the existing behavior and I expanded it to cover the edge case described above.

Additionally, testing this part of the code will be much easier to test once we implement #19624.

### Documentation

- [x] `doc-not-needed`

### Matching PR in forked repository

PR in forked repository: the relevant tests pass locally, so I am going to skip the forked tests.
michaeljmarshall added a commit to michaeljmarshall/pulsar that referenced this issue Apr 20, 2023
Fixes: apache#10816
PIP: apache#19771
Supersedes: apache#19026
Depends on: apache#20062

The Pulsar Proxy does not properly handle authentication data refresh when in state `ProxyLookupRequests`. The consequence is described in apache#10816. Essentially, the problem is that the proxy caches stale authentication data and sends it to the broker leading to connection failures.

apache#17831 attempted to fix the underlying problem, but it missed an important edge cases. Specifically, it missed the case that the `ConnectionPool` will have multiple connections when a lookup gets redirected. As such, the following problem exists (and is fixed by this PR):

1. Client opens connection to perform lookups.
2. Proxy connects to broker 1 to get the topic ownership info.
3. Time passes.
4. Client does an additional lookup, and this topic is on a newly created broker 2. In this case, the proxy opens a new connection with the stale client auth data.
5. Broker 2 rejects the connection because it fails with expired authentication.

* Remove some of the implementation from apache#17831. This new implementation still allows a broker to challenge the client through the proxy, but notably, it limits the number of challenges sent to the client. Further, the proxy does not challenge the client when the auth data is not expired.
* Introduce authentication refresh in the proxy so that the proxy challenges the client any time the auth data is expired.
* Update the `ProxyClientCnx` to get the `clientAuthData` from the `ProxyConnection` to ensure that it gets new authentication data.
* Add clock skew to the `AuthenticationProviderToken`. This is necessary to make some of the testing not flaky and it will also be necessary for users to configure in their clusters.

The `ProxyRefreshAuthTest` covers the existing behavior and I expanded it to cover the edge case described above.

Additionally, testing this part of the code will be much easier to test once we implement apache#19624.

- [x] `doc-not-needed`

PR in forked repository: the relevant tests pass locally, so I am going to skip the forked tests.

(cherry picked from commit 075b625)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
7 participants