Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

asyncproducer fails with topic does not exist when consecutive messages are published to different topics in the same millisecond #2438

Closed
dineshudayakumar opened this issue Feb 16, 2023 · 4 comments

Comments

@dineshudayakumar
Copy link

Versions
Sarama Kafka Go
1.37.2 2.0.0 1.20.1
Configuration
func ProducerConfig() *sarama.Config {
	conf := sarama.NewConfig()
	conf.Producer.Compression = sarama.CompressionSnappy
	conf.Producer.RequiredAcks = sarama.WaitForAll
	conf.Producer.Return.Successes = true
	conf.Producer.Return.Errors = true
	conf.Producer.Retry.Max = 5
	conf.Producer.Retry.Backoff = 500 * time.Millisecond
	conf.Metadata.Full = false
	conf.Metadata.Retry.Max, _ = 50
	conf.Metadata.Retry.Backoff = time.Duration(100) * time.Millisecond
	conf.ClientID = "clientID"
	conf.Version = sarama.V2_0_0_0
	return conf
}
Logs

When I enable logs to print directly to stdout it takes few ms so the issue is not reproducible so I have the logger to print to stdout using a goroutine, so the below logs lines might not be in the exact order.
I updated source code to print stacktrace at places where all ErrUnknownTopicOrPartition was thrown

The below test uses a consumer to consume from 2 topics and a async producer to publish to 2 topics

logs: CLICK ME

=== RUN   TestRoundTrip
Successfully initialized new client
Initializing new client
Initializing new client
Successfully initialized new client
Initializing new client
Successfully initialized new client
Initializing new client
Successfully initialized new client
Initializing new client
Successfully initialized new client
Initializing new client
Initializing new client
Consumer.MaxWaitTime is very low, which can cause high CPU and network usage. See documentation for details.
Deprecation warning: Consumer.Offsets.CommitInterval exists for historical compatibility and should not be used. Please use Consumer.Offsets.AutoCommit, the current value will be ignored
Successfully initialized new client
Successfully initialized new client
Initializing new client
Successfully initialized new client
client/metadata fetching metadata for [test_rn.pos_transaction.pb test_rn.pos_transaction_historical.pb] from broker kafka:9092
Consumer.MaxWaitTime is very low, which can cause high CPU and network usage. See documentation for details.
Deprecation warning: Consumer.Offsets.CommitInterval exists for historical compatibility and should not be used. Please use Consumer.Offsets.AutoCommit, the current value will be ignored
Connected to broker at kafka:9092 (unregistered)
client/brokers registered new broker #0 at 44fe8f289aff:29092Consumer.MaxWaitTime is very low, which can cause high CPU and network usage. See documentation for details.
Deprecation warning: Consumer.Offsets.CommitInterval exists for historical compatibility and should not be used. Please use Consumer.Offsets.AutoCommit, the current value will be ignored
Connected to broker at 44fe8f289aff:29092 (registered as #0)
client/metadata fetching metadata for [test_rn.pos_transaction.pb test_rn.pos_transaction_historical.pb] from broker kafka:9092
client/coordinator requesting coordinator for test-ab1f9599-adae-11ed-8049-80008c44297f-default-worker from kafka:9092
client/coordinator coordinator for test-ab1f9599-adae-11ed-8049-80008c44297f-default-worker is #0 (44fe8f289aff:29092)
client/coordinator requesting coordinator for test-ab1f9599-adae-11ed-8049-80008c44297f-default-worker from kafka:9092
client/coordinator coordinator for test-ab1f9599-adae-11ed-8049-80008c44297f-default-worker is #0 (44fe8f289aff:29092)
Successfully initialized new client
Initializing new client
client/metadata fetching metadata for [test_rn.pos_transaction.pb] from broker kafka:9092
goroutine 179222 [running]:
runtime/debug.Stack()
        /usr/local/go/src/runtime/debug/stack.go:24 +0x64
runtime/debug.PrintStack()
        /usr/local/go/src/runtime/debug/stack.go:16 +0x1c
github.com/Shopify/sarama.(*client).Partitions(0x0?, {0x4006b660c0, 0x25})
        /Users/dinesh.udayakumar/go/pkg/mod/github.com/!shopify/[email protected]/client.go:348 +0xb4
github.com/Shopify/sarama.(*topicProducer).partitionMessage.func1()
        /Users/dinesh.udayakumar/go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:534 +0x98
github.com/eapache/go-resiliency/breaker.(*Breaker).doWork.func1(0x40062b0678?, 0x177a4?)
        /Users/dinesh.udayakumar/go/pkg/mod/github.com/eapache/[email protected]/breaker/breaker.go:85 +0x54
github.com/eapache/go-resiliency/breaker.(*Breaker).doWork(0x0?, 0x0, 0x0?)
        /Users/dinesh.udayakumar/go/pkg/mod/github.com/eapache/[email protected]/breaker/breaker.go:86 +0x30
github.com/eapache/go-resiliency/breaker.(*Breaker).Run(...)
        /Users/dinesh.udayakumar/go/pkg/mod/github.com/eapache/[email protected]/breaker/breaker.go:55
github.com/Shopify/sarama.(*topicProducer).partitionMessage(0x4007d9fe40?, 0x4005c70b40)
        /Users/dinesh.udayakumar/go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:525 +0x90
github.com/Shopify/sarama.(*topicProducer).dispatch(0x4007d9fe40)
        /Users/dinesh.udayakumar/go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:502 +0x60
github.com/Shopify/sarama.withRecover(0x0?)
        /Users/dinesh.udayakumar/go/pkg/mod/github.com/!shopify/[email protected]/utils.go:43 +0x40
created by github.com/Shopify/sarama.(*asyncProducer).newTopicProducer
        /Users/dinesh.udayakumar/go/pkg/mod/github.com/!shopify/[email protected]/async_producer.go:495 +0x1fc
err=kafka: Failed to produce message to topic test_rn.pos_transaction_historical.pb: kafka server: Request was for a topic or partition that does not exist on this broker
client/metadata fetching metadata for [test_rn.pos_transaction_historical.pb] from broker kafka:9092
Flush Start
consumer/broker/0 accumulated 6 new subscriptions
consumer/broker/0 added subscription to test_rn.pos_transaction_historical.pb/0
consumer/broker/0 added subscription to test_rn.pos_transaction.pb/2
consumer/broker/0 added subscription to test_rn.pos_transaction_historical.pb/1
consumer/broker/0 added subscription to test_rn.pos_transaction.pb/0
consumer/broker/0 added subscription to test_rn.pos_transaction_historical.pb/2
consumer/broker/0 added subscription to test_rn.pos_transaction.pb/1
Connected to broker at kafka:9092 (unregistered)
client/brokers registered new broker #0 at 44fe8f289aff:29092producer/broker/0 state change to [open] on test_rn.pos_transaction.pb/2
producer/broker/0 starting up
Connected to broker at 44fe8f289aff:29092 (registered as #0)
    utility.go:61: lightspeed_fetcher_test.go:314 Got error: kafka producer errors: [kafka: Failed to produce message to topic test_rn.pos_transaction_historical.pb: kafka server: Request was for a topic or partition that does not exist on this broker]
--- FAIL: TestRoundTrip (0.15s)

Problem Description

We are using the AsyncProducer to publish messages to Kafka and in one of our test cases we are creating the AsyncProducer and publishing multiple messages and intermittently we see that the publish fails with topic/partition does not exist on this broker. By looking at the Kafka request logs and the sarama source code, I could identify that when the first 2 messages getting published are sent at the same millisecond the refreshMetadata method is returning nil and paritionMessage method is throwing the error.

This could be due to the below line in method tryRefreshMetadata in client.go

		t := atomic.LoadInt64(&client.updateMetaDataMs)
		if !atomic.CompareAndSwapInt64(&client.updateMetaDataMs, t, time.Now().UnixNano()/int64(time.Millisecond)) {
			return nil
		}

@github-actions
Copy link

Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur.
Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.

@github-actions github-actions bot added the stale Issues and pull requests without any recent activity label Jul 17, 2023
@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale Aug 16, 2023
@dnwe dnwe reopened this Aug 16, 2023
@dnwe
Copy link
Collaborator

dnwe commented Aug 16, 2023

@dineshudayakumar please can you test with latest sarama to see if the concurrency fixes have resolved this issue?

@github-actions github-actions bot removed the stale Issues and pull requests without any recent activity label Aug 16, 2023
@dineshudayakumar
Copy link
Author

Sure, will test and confirm in a day or two (and thanks for fixing)

@dineshudayakumar
Copy link
Author

Looks like I am not seeing this issue anymore, happy to close it. Thank you!!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants