Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add partitioned transactional source #705

Closed

Conversation

charlibot
Copy link
Contributor

By adding a partitioned transactional source, a new producer will be created per topic-partition when hooked up with the sink or flow. If the transactional.id is based on the topic-partition then can get around the restriction of distributing the workload across multiple instances without getting ProducerFencedExceptions.

This was based on reviewing the comments in this thread: #420 (review)

@ennru
Copy link
Member

ennru commented Jan 11, 2019

Thank you for adding this. Let's ask @seglo to review it.

@charlibot
Copy link
Contributor Author

I just tried this with some load, starting one instance then another after a few seconds. I'm getting ProducerFencedException on the first instance because the producers in the second instance (with the same transactional.id) are being created before the first has finished producing from the revoked partitions. Will have to resolve this before it is ok to merge.

@seglo
Copy link
Contributor

seglo commented Jan 11, 2019

@charlibot Thanks for this contribution. I'm looking forward to having this merged!

I just tried this with some load, starting one instance then another after a few seconds. I'm getting ProducerFencedException on the first instance because the producers in the second instance (with the same transactional.id) are being created before the first has finished producing from the revoked partitions. Will have to resolve this before it is ok to merge.

I think the path forward here would be to listen for revoked partitions and somehow trigger a commit from the revoked partitions handler (TopicPartitionsRevoked). Partition assignments as part of a consumer group rebalance won't occur until this handler completes, so that should prevent your second instance from running before the first has resolved the transaction in some way.

@charlibot charlibot force-pushed the transactional-partitioned-source branch from c9abe79 to 6160e9f Compare January 12, 2019 00:41
@charlibot
Copy link
Contributor Author

@seglo thanks for the pointer. I had to use the handler directly from the sub source logic because the rebalance listener approach with actors is using tell and so not waiting for the TopicPartitionsRevoked code block to complete.

The transactional sub source now emits a promise along with the topic-partition and source. This promise should be inserted into the sink or flow which is completed when that producer stage is completed. Now the sub source can safely shutdown transactional streams and wait for the commits to complete when partitions are revoked.

At the moment all sub sources are stopped since rebalancing seems to cause all partitions to be revoked. Therefore, on reassignment which partitions are left will have to recreate the source (not sure if this is too much of an issue?). Ideally though, this logic would be moved to the assignment phase, stopping only those that have been reassigned. This would require making TransactionalProducerStageLogic lazily init transactions which I didn't look too much into.

Looking forward to your feedback, I'm not fully committed to this promise based approach so any other ideas are more than welcome!

@seglo
Copy link
Contributor

seglo commented Jan 13, 2019

Great. I'll do a more thorough review on Monday.

@seglo thanks for the pointer. I had to use the handler directly from the sub source logic because the rebalance listener approach with actors is using tell and so not waiting for the TopicPartitionsRevoked code block to complete.

That's interesting. IMO this is a bug to not block when using these handlers since their primary use case is to gracefully handle a partition being revoked from a group member during a rebalance. We should either change the rebalance listener to use the ask pattern or add a new feature which uses ask and waits for an acknowledgement. Your workaround sounds fine to me for this PR.

From the Kafka consumer javadocs for ConsumerRebalanceListener:

It is guaranteed that all consumer processes will invoke onPartitionsRevoked prior to any process invoking onPartitionsAssigned. So if offsets or other state is saved in the onPartitionsRevoked call it is guaranteed to be saved by the time the process taking over that partition has their onPartitionsAssigned callback called to load the state.

https://kafka.apache.org/20/javadoc/org/apache/kafka/clients/consumer/ConsumerRebalanceListener.html

@ennru
Copy link
Member

ennru commented Jan 14, 2019

@seglo I agree that the non-blocking rebalance event notification is not that useful. I did not find a good way to do something about it yet, please see #539 (comment) for the forces on that...

@seglo
Copy link
Contributor

seglo commented Jan 14, 2019

Thanks for the pointer @ennru . IMO the greatest value of the Partitioned Transactional Source is to easily distribute transactional workloads across consumer groups with more than 1 member. Therefore it's important to find a way to figure out a way we can synchronously handle the group rebalance.

Since transactions are involved we could workaround this by using an Akka Streams RestartSource which will restart the stream until we stop getting ProducerFencedExceptions, but the clean way would be to commit or rollback the transaction, shutdown the subsource, and force the group member the partition was re-assigned to, to wait for its assignment.

I suggest we continue the discussion in #539 and come up with a new API that makes this possible and then leverage that for this contribution.

Copy link
Contributor

@seglo seglo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @charlibot . I changed my mind and decided to do an indepth review before trying to come to consensus on #539, as I suggested earlier.

While reviewing this PR I thought of some fairly major issues that will be left to the user to figure out when they use the partitioned source (such as producer partitioning strategy, see my comments left in the review). I don't think we can provide implementations to protect the user from these issues, otherwise Alpakka Kafka would become more of a heavyweight framework than the lightweight library that it is today. At a certain point, users have to weigh the pros and cons to having some of these things automatically handled for them (Kafka Streams/framework) and having a lot of control (Alpakka Kafka).

I'm interested to hear yours and @ennru & @2m thoughts.

Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some questions/suggestions.

@charlibot charlibot force-pushed the transactional-partitioned-source branch from 2acdef8 to 594a505 Compare January 17, 2019 17:35
@charlibot
Copy link
Contributor Author

Thanks both for your feedback! I'll hold off on a couple of the items until we resolve the producer partitioning issue since they involve that aspect.

@charlibot
Copy link
Contributor Author

@seglo I've refactored SubSourceLogic and a couple of other things. Could you take another look?

Copy link
Contributor

@seglo seglo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking really good! I made a few minor comments.

Also, do you think it's worthwhile to add a test like "signal rebalance events to actor" in IntegrationSpec, which would test the behaviour of the rebalance when some messages have already been produced from a transactional workload with a revoked consumer?

docs/src/main/paradox/transactions.md Outdated Show resolved Hide resolved
core/src/main/scala/akka/kafka/ConsumerSettings.scala Outdated Show resolved Hide resolved
core/src/main/scala/akka/kafka/ConsumerSettings.scala Outdated Show resolved Hide resolved
Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few naming suggestions and questions from me.

core/src/main/scala/akka/kafka/ConsumerSettings.scala Outdated Show resolved Hide resolved
core/src/main/resources/reference.conf Outdated Show resolved Hide resolved
core/src/main/scala/akka/kafka/javadsl/Transactional.scala Outdated Show resolved Hide resolved
Copy link
Member

@ennru ennru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I came up with a few suggestions and questions...

@ennru
Copy link
Member

ennru commented Feb 1, 2019

Thank you @charlibot and @seglo for your work with this.
We discussed it and will work more thoroughly with it, but I already now want to let you know that this won't be merged into Alpakka Kafka 1.0. It will have to wait until 1.1 as we are in the release-candidate phase already and this touches quite a few things.

@ennru ennru added this to the 1.1 milestone Feb 1, 2019
@seglo
Copy link
Contributor

seglo commented Feb 1, 2019

Cool. That's exciting :) Thanks for the heads up.

@charlibot
Copy link
Contributor Author

Hi both, I tried a slightly different approach that uses the rebalance actor. It required making the ListenerCallbacks sync with an ask as suggested in #539 (comment). Also requires the subsources to emit with a control so can drain them independently. See: https://github.com/akka/alpakka-kafka/compare/master...charlibot:remove-async-callbacks?expand=1 (TpsExample is a good place to start). WDYT?

@ennru
Copy link
Member

ennru commented Feb 15, 2019

Sorry, I didn't have the time to look at your new approach, yet. Will do soon.

@ennru
Copy link
Member

ennru commented Mar 25, 2019

@charlibot I've finally got around to look into your proposal again. We've quite some digging going on as some have discovered duplicates in transactional source/sink flows #756 and #758.
Everything circles around rebalancing #755. So @szymonm and I have simplified the rebalance listener and the ListenerCallbacks.
That introduced some conflict to your suggestions, would you like to rebase those?

@charlibot charlibot force-pushed the transactional-partitioned-source branch from f710098 to 49f6386 Compare March 26, 2019 15:30
@charlibot
Copy link
Contributor Author

@ennru I just rebased. Unfortunately since need to block in the onRevoke callback, I had to undo the nice ListenerCallbacks changes in SubSourceLogic. I noticed some more discussion in #539 which could clean this up when resolved. I'm still leaning toward some of the work in my other branch remove-async-callbacks but I might be missing something there re threads and such.

@ennru
Copy link
Member

ennru commented Mar 26, 2019

Thank you.
In your remove-async-callbacks branch the (now non-async) callbacks touch state in the stage concurrently from the actor's thread which called poll. That is not allowed.
I'm making some progress on a new take for the rebalancing callbacks which will block, but that is still a long way to go.

@charlibot
Copy link
Contributor Author

Ah I see, thanks for the explanation. Let me know if I need to make any more changes

@ennru
Copy link
Member

ennru commented Mar 29, 2019

I've just opened #761 which explores synchronous callbacks for rebalancing.

@charlibot charlibot force-pushed the transactional-partitioned-source branch from 49f6386 to b9bcf9e Compare April 19, 2019 10:12
@ennru
Copy link
Member

ennru commented Apr 26, 2019

The work on the transactional flows hasn't come to an end, yet. We'll revisit your PR once that is done -- hopefully very soon.

@charlibot
Copy link
Contributor Author

Hi @ennru, just updated this PR by removing the promise stuff since can reuse the smarter draining logic that was written for the single transactional source.

@ennru
Copy link
Member

ennru commented May 10, 2019

I see you merged master into this branch. I'm afraid that could give us trouble when merging it back to master. A rebase would be preferred.

@charlibot charlibot force-pushed the transactional-partitioned-source branch 3 times, most recently from 6c2d96b to f5c5c22 Compare May 10, 2019 16:25
@2m 2m self-assigned this May 16, 2019
@charlibot charlibot force-pushed the transactional-partitioned-source branch from f5c5c22 to 8ece2cf Compare June 14, 2019 14:48
@brendan-mcadams-exa
Copy link

Any chance of getting this PR finished and approved? The functionality would be extremely useful.

@seglo
Copy link
Contributor

seglo commented Aug 13, 2019

I think this PR can be revived now that there's an internal implementation to handle revoked partitions synchronously from #761

@ennru
Copy link
Member

ennru commented Aug 14, 2019

@brendan-mcadams-exa Thank you for pinging this suggested feature.
Could you explain your use-case a bit? I wonder if parts of what this kind of source offers would be easier solved with a proper rebalance handler which I have been investigating.

@ennru ennru unassigned 2m Aug 14, 2019
@ennru ennru removed this from the 1.1.0-RC1 milestone Sep 3, 2019
@ennru ennru added the core label Sep 17, 2019
Copy link
Contributor

@raboof raboof left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Getting to use the Kafka transaction fencing feature without having to coordinate assigning transactionalId's to nodes seems really interesting, and running a stream per partition seems like it'd make sense in many cases as well.

Have we checked it's OK to create potentially many Kafka producers?

ProducerConfig.TRANSACTIONAL_ID_CONFIG -> transactionalId,
ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION -> 1.toString
)
val txSettingsToProducer = (txid: String) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this creates a producer (with a unique transactionalId) for each partition that is assigned to this node. Isn't that expensive, or is the Kafka client smart about pooling the resources for those multiple producers?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@raboof The original idea was to create a producer per partition, but you're right that it's possible that that could have unintended performance problems for large partition subscriptions. AFAIK the separate instances of KafkaProducer don't share a common resource pool, so they would be managed separately. We don't appear to have a precedent in our docs for using producers per substream with other partitioned sources.

I looked at Kafka Streams impl and the original design doc (https://docs.google.com/document/d/1pGZ8xtOOyGwDYgH5vA6h19zOMMaduFK1DAB8_gBYA2c/edit#heading=h.mki3gltx1zw) and they create 1 KafkaProducer per Task. Each TaskId (1:1 with a Task) appears to represent one assigned partition. The Kafka Streams Task transactional.id for a producer is set with a pattern of the application id (defined by the user across all instances), and a TaskId. So my initial understanding is that it will indeed create one producer per partition. I think we need to study this implementation more closely to understand if we need to be smarter about the number of producers we spawn.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took a look at this again and it does look like a producer is created per task. I'm less clear on whether a task is 1:1 with a partition, but since you can only define one partition per task I don't see how it could be anything else.

This understanding conflicts with the original design doc, so I posed the question to the kafka-users mailing list. Hopefully we'll get an authoritative reply.

https://lists.apache.org/thread.html/491b89840776498009fa773a737bf512a2c144558cce961b4ff8c768@%3Cusers.kafka.apache.org%3E

@raboof
Copy link
Contributor

raboof commented Oct 4, 2019

I started on rebasing this PR - it now compiles but there are still tests failing (https://travis-ci.org/akka/alpakka-kafka/builds/593117356)

@seglo
Copy link
Contributor

seglo commented Oct 10, 2019

Thanks @raboof . I've started reviewing your branch. I'll follow up in the next week.

@seglo
Copy link
Contributor

seglo commented Oct 11, 2019

I've created a new PR #930 to continue this work with the goal of including it in Alpakka Kafka 2.0.0.

@seglo seglo closed this Oct 11, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants