Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

keep state info on kafka #35

Closed
Du-Li opened this issue Mar 21, 2023 · 34 comments
Closed

keep state info on kafka #35

Du-Li opened this issue Mar 21, 2023 · 34 comments
Assignees
Labels
enhancement New feature or request

Comments

@Du-Li
Copy link

Du-Li commented Mar 21, 2023

Is your feature request related to a problem? Please describe.
Currently the dcp-kafka connector writes the state info (checkpoints) back to couchbase, which consequently magnifies the workload on CB. In my tests, for example, I generated 4k RPS to CB but observed 15k RPS there, which adds almost 3x extra workload to CB. This may not be acceptable in production. Moreover, conceptually, CDC is meant to be non-intrusive to the source databases. Keeping the state info on CB would cause many problems not only in terms of capacity but also breaking that read-only promise or expectation.

Describe the solution you'd like
At least make it an option for the dcp-kafka connector to keep the state info on Kafka instead of CB.

Describe alternatives you've considered
Define an interface so the developer can choose to use Kafka or CB. The two paths will implement the same interface.

Additional context
In my use case, we are definitely not allowed to write to the CB cluster. CDC is meant to be read-only.

@Du-Li
Copy link
Author

Du-Li commented Mar 22, 2023

@erayarslan What's your team's opinion regarding this issue?

@erayarslan
Copy link
Member

in this projects we are following path of couchbase elasticsearch connector which is storing checkpoints on couchbase.

but we are aware of workload when checkpoint commit trigger. we will implement solutions to reduce that.

@Du-Li
Copy link
Author

Du-Li commented Mar 23, 2023

How about implementing an optional checkpointing mechanism that saves to kafka?

@erayarslan
Copy link
Member

we already have abstraction on metadata in here
currently we have filesystem and couchbase. default is couchbase.
i think kafka metadata implementation is a good idea on kafka connector project.
atleast we can give freedom about that.

@erayarslan erayarslan added the enhancement New feature or request label Mar 23, 2023
@Du-Li
Copy link
Author

Du-Li commented Mar 23, 2023

The other Couchbase connector (kafka-connect-couchbase) keeps state info in Kafka. I started from there but found its performance unsatisfactory. So I reimplemented a CB connector using your libraries. The e2e latencies improved 20x right away to the 100ms level. It would be ideal if you also implemented the kafka metadata option. Otherwise my project couldn't go to production. As said above, CDC is meant to be read-only so not allowed to write into the source database.

@erayarslan
Copy link
Member

yes we are aware of java connector's behaviour. but another hand elasticsearch connector storing checkpoints on couchbase.
i think approaches differ here. but we are okay with kafka metadata feature.
thank you for feedback, we will implement this soon.

@Du-Li
Copy link
Author

Du-Li commented Mar 23, 2023

Thanks for your support. What timeline shall we expect?

@erayarslan
Copy link
Member

it will be on our planning this week.

erayarslan added a commit that referenced this issue Mar 24, 2023
@erayarslan
Copy link
Member

hi @Du-Li
with go-kafka-connect-couchbase v0.0.25, you can use kafka metadata feature.
our example updated with that.

thanks for idea, i hope it solve your requirements.

@Du-Li
Copy link
Author

Du-Li commented Mar 24, 2023

Awesome. Very impressed. Thank you. I will try it out and let you know.

@Du-Li
Copy link
Author

Du-Li commented Mar 28, 2023

@erayarslan I have tested the kafka metadata feature. It worked perfectly. Thank you!

However, after I restarted the dcp-kafka connector pods (k8s statefulset membeship) on the fly, the connectors were not able to resume from where they stopped, while their cpus stayed at 100% utilization. My checkpoint type was manual. Do you know what might caused this problem and how to fix it?

@erayarslan
Copy link
Member

erayarslan commented Mar 28, 2023

@Du-Li thank you for testing kafka metadata. i am so glad it worked for you.
i tried your configuration on our kubernetes but cannot reproduce cpu utilization and it worked perfectly fine.
maybe you can upgrade latest version of go-kafka-connect-couchbase, which is currently v0.0.35.
we made some improvements on restoring streams, rollback etc.

or maybe u can share latest logs of connector, so we can diagnose

@Du-Li
Copy link
Author

Du-Li commented Mar 29, 2023

The version I tested was already v0.0.35. The logs were not meaningful. All the connectors just stopped there after printing the initialization messages such as dcp stream connected. They kept spinning the cpus 100% without doing anything visible.

@Du-Li
Copy link
Author

Du-Li commented Mar 29, 2023

In general, what settings are available for checkpoint and what differences they make? I cannot tell from your README.

@erayarslan
Copy link
Member

only manual and auto options available for checkpoint. i think its not about checkpoint.
how much item produced your kafka metadata offset topic? can you tell?
because when you boot your application, kafka metadata feature consume whole offset topic to understand where you finished last stream.

@Du-Li
Copy link
Author

Du-Li commented Mar 29, 2023

gotcha!

When taking the cb metadata option, I noticed that the metadata topic data rate was like 12k RPS, as I noted above. Guess the kafka metadata option has a similar data rate. I just logged in to one of my kafka broker to count the messages and the broker crashed/restarted. It's perhaps too big to process from a single node.

I was running 10 dcp-kafka connector pods. When they are restarted, each of them might be reading the entire metadata topic from beginning to end, which is huge. That's perhaps why they all hang there using 100% cpu.

If my guess were right, some optimization would be required. Pod restarts are unavoidable in k8s for all kinds of reasons. The system wouldn't be useful if restart makes the pod hang.

@erayarslan
Copy link
Member

erayarslan commented Mar 29, 2023

because of this we are using compacted topic for offsets. also debezium, kafka using same system like that.
so kafka metadata okey but i think you need to tune .commit() frequency which is depend our kafka batch producer configuration.

you can tune with these configs:

kafka.producerBatchSize
kafka.producerBatchTickerDuration

on the other hand i want to improve our offset consuming logic.
%100 cpu utilization its not good.

@Du-Li
Copy link
Author

Du-Li commented Mar 29, 2023

My settings were like

kafka.producerBatchSize=50
kafka.producerBatchTickerDuration=50ms

Does it make sense to you?

@erayarslan
Copy link
Member

erayarslan commented Mar 29, 2023

producerBatchTickerDuration means, connector will flush messages every 50ms and commit offsets.
producerBatchSize means when your messages in batch producer queue over 50 message before 50ms trigger, connector will flush messages and commits offsets.

project still under development and not stable so i think its documentation issue.
we need to improve our documentation. :/

so in your case we need to increase these.
but size and tickerDuration config not enough.
i think we need to implement max bytes kinda config to avoid oom for big batches.

currently u need to set these by your avarage per message size.

maybe

kafka.producerBatchSize=100
kafka.producerBatchTickerDuration=10s

meanwhile, we will make the improvements i mentioned above.

@erayarslan
Copy link
Member

erayarslan commented Mar 29, 2023

TIL; in kafka connect couchbase default kafka.producerBatchSize is 2000 here

@Du-Li
Copy link
Author

Du-Li commented Mar 29, 2023

@erayarslan Thanks for your explanations. Actually I tried different values in load testings but they didn't seem to make any difference.

@erayarslan
Copy link
Member

@Du-Li did you clean the old metadata topic? or give different topic name to create new one?

@Du-Li
Copy link
Author

Du-Li commented Mar 29, 2023

I reused the old metadata topic without cleaning.

@erayarslan
Copy link
Member

erayarslan commented Mar 29, 2023

Kafka metadata always consume whole metadata topic at the start of application.
We talked about how to keep the metadata topic smaller above.

@Du-Li
Copy link
Author

Du-Li commented Mar 29, 2023 via email

@erayarslan
Copy link
Member

Because of this our metadata topic has cleanup.policy=compact otherwise metadata will lose. so we cannot set retention.
but we can trigger more compaction with kafka configs of metadata topic.

we are already working on this.

@erayarslan
Copy link
Member

@Du-Li with v0.0.36 we descrease segment.bytes of metadata topic for increase compaction count.
can you upgrade and remove old metadata topic? let application create new metadata topic.

@Du-Li
Copy link
Author

Du-Li commented Mar 31, 2023

I have tested the new version. It worked fine although I haven't measured how much difference it makes. Thank you!

@erayarslan
Copy link
Member

many thanks for your contributions!

@Du-Li
Copy link
Author

Du-Li commented Mar 31, 2023

@erayarslan Can you help estimate the size of checkpoint messages in kafka? Suppose there are avg 1000 CB DCP events per second and each event has three copies in the kafka. What's the size of each checkpoint message and how many checkpoint messages are generated per second?

@erayarslan
Copy link
Member

erayarslan commented Mar 31, 2023

its about compaction trigger, i cannot estimate perfectly. but we decrease segment.bytes of metadata topic to 2mb in here
which is lower then java couchbase kafka connector's default.

so there will be nothing to worry about.

@Du-Li
Copy link
Author

Du-Li commented Mar 31, 2023

I need to capacity estimates for my kafka cluster. Can you give me a rough idea how much space (msg size and data rate) the checkpoint topic will need? It doesn't have to be too accurate. Thanks. @erayarslan

@erayarslan
Copy link
Member

i saw compaction trigger when metadata topic size 3-5mb~ in my test cluster.
but this is not a guarantee. i think 100mb safe for kafka metadata topic.

@erayarslan
Copy link
Member

erayarslan commented Mar 31, 2023

for data rate its depend your batch size.
i think you can calculate like;

checkpoint per second to kafka metadata = event per second / batchSize * 1024

1024 is affected vbucket. i assumed they were all affected.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants