Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Better Broker Connection Management #15

Closed
eapache opened this issue Aug 15, 2013 · 29 comments
Closed

Better Broker Connection Management #15

eapache opened this issue Aug 15, 2013 · 29 comments

Comments

@eapache
Copy link
Contributor

eapache commented Aug 15, 2013

Following up to #7, #9, #10, #13 our connection management still has problems.

First some things to know about Kafka that complicate the issue:

  • Broker metadata requests do not seem required to return all the brokers they know about (it is not documented which ones they do return, but I suspect it is the only the subset of brokers that are leading a partition whose data is also in the response).
  • The user only specifies (host, port) pairs while Kafka metadata returns (host, port, brokerId) triples, with no guarantee that the host matches the user-specified hostname, and with no apparent way to query a broker's own brokerId.

Now some of the issues with our current implementation:

  • If the user provides three addresses, only the last of which is valid, we wait for the first two connections to completely fail when fetching metadata before we try the third. Since the TCP connection timeout can be very long (many minutes) this can take a long time. Trying them all in parallel and using the first valid one would be better.
  • Once we've had an address fail, we discard it forever. Over the lifetime of a long-running cluster, all of the nodes may be down at one point or another, meaning that eventually we may 'run out' of nodes and abort.

Thoughts on potential future design:

  • We should connect to brokers lazily, ie only when we actually want to talk to them, not just when we know they exist.
  • Since there is only ever one leader for a partition, if the connection fails it doesn't make sense to just try another broker. Therefore the caller should decide to proceed, which means that the client's leader function shouldn't guarantee that the broker it returns is connected, only connecting.
  • However, when fetching metadata, any broker will do, so the any function should guarantee that the broker it returns is actually connected. Walking through bad brokers to find a good one could be slow, but connecting to all the brokers just to pick one and drop the others is gross. We could try to connect to one, and if it hasn't succeed in 5 seconds, start another, etc. That should cover the common case, since if it hasn't responded after 5 seconds it probably won't, and if it does then we'll drop it right away, oh well. More importantly, we won't spawn hundreds of connections if the first broker does respond normally.
  • User-provided addresses should be managed separately (which they already are) since they don't have brokerIds. They're only useful for metadata requests, so they should be disconnected entirely 99% of the time. If one of them fails, we should probably just mark it as recently-failed, and retry it after some time has passed. Only if all user-supplied nodes have recently failed (and we have no metadata from the cluster itself) should we abort somehow.

Open Questions:

  • If a user-supplied broker is used to satisfy a call to any, when should it be disconnected? The extraBroker field in client currently is a hack to work around this, and it still leaves one connection floating around for now reason. Presumably disconnectBroker could be used and made smarter? However, if a metadata-supplied broker is used, calling disconnectBroker will pull it out from underneath other code that could be using it as a leader. Do we need reference-counting?
  • There is currently no way to tell that a Broker is 'Connecting' you can only tell that the lock is currently held. This means that if two concurrent leader calls would return the same broker and both call Open, the second one will block when we don't want it to. Do we need a second lock in the brokers? That gets really messy...
eapache pushed a commit that referenced this issue Aug 27, 2013
Also add a more comprehensive comment to disconnectBroker().

This is the last piece of #23 that doesn't heavily depend on getting #15 right
first.
@eapache
Copy link
Contributor Author

eapache commented Aug 29, 2013

As suggested by somebody at the review this afternoon, I have filed a ticket with Kafka asking why not all brokers seem to be returned from a "get all metadata" request.

https://issues.apache.org/jira/browse/KAFKA-1033

@tyro89
Copy link

tyro89 commented Aug 30, 2013

After some digging we discovered this is due to Kafka not querying ZK for the most recent data but instead each broker uses its local "belief"? of what the cluster looks like. The reason it is not querying ZK seems to be mainly due to the following patch https://issues.apache.org/jira/browse/KAFKA-901 where there was worries about overloading ZK in the event of failures.

@bouk
Copy link

bouk commented Sep 17, 2013

I'll just paste the mail I sent to Evan:


I'm going to be working on Sarama and specifically #15, so I thought I'd consult you for a bit, since you worked on it the most. I looked at the code of the scala library and the way they seem to implement managing the brokers is as follows:

  • The metadata gets refreshed when it can't find the leader for the partition it wants to send to, but it also checks every (configurable) number of seconds.
  • The metadata only gets refreshed from the initial broker pool (those you give to the producer when you construct it), and the new pool completely replaces the current known brokers. The metadata gets refreshed from the initial brokers one by one, in random order. The initial broker list never gets thrown out, so you never run out of brokers automatically. If it can't get the needed brokers it'll 'back off' for a few seconds and try again for a certain number of times. If it still can't find it afterwards it'll just outright fail.

To clarify a bit:

There's basically two broker pools, the initial/seed pool and the pool that it actually uses and connects to. The initial pool is used for metadata requests only and the brokers that it gets from the requests are those that are used for actually producing/consuming. Of course, these two pools can overlap. The initial pool is frozen.

I think copying this behavior is the way to go.

@fw42
Copy link
Contributor

fw42 commented Sep 17, 2013

Thanks for looking into this @bouk :-)

@tobi
Copy link

tobi commented Sep 17, 2013

Nice work @bouk

@eapache
Copy link
Contributor Author

eapache commented Sep 17, 2013

Good things we should definitely copy:

  • configurable timer-based metadata refresh (in addition to whenever it appears stale)
  • don't throw out failed connections, back off instead
  • eventually outright fail after backing off a certain (configurable) amount

An issue I see with this design: if the subset of brokers provided by the user goes down, it fails to get metadata even if other connections are still up. Ideally the user should only have to provide two or three brokers for bootstrap, not the entire (potentially large) cluster.

And follow-up questions: are broker connections maintained always, even when not in use? Is there any logic to deal with this efficiently, or is it just assumed that the cluster size is small enough for this to be negligible in terms of file-handles and performance? Is there any logic to deal with long TCP timeouts, or could a broker in a bad state cause the entire library to block for 2+ minutes trying to get metadata?

@eapache
Copy link
Contributor Author

eapache commented Sep 17, 2013

(Linkedin runs some very large clusters, so I would be surprised if their official client simply kept all of those connections open all of the time.)

@sirupsen
Copy link
Contributor

An issue that seems related we encountered in the wild:

We started a few clients running Sarama on a lot of servers. The next day, we went back and tested them and had left them running for a day. It seemed to still think it had the connection, but for some reason it couldn't send off messages and didn't crash when trying. Any ideas why this could happen @eapache?

@bouk
Copy link

bouk commented Sep 17, 2013

The user does provide only a few bootstrap brokers, so we'd have to make sure that at least one of those is online. It then finds the relevant brokers from that set. I saw that sarama currently requests the metadata on all topics and partitions, that is not needed, the scala implementation just gets the leaders of all the partitions for the topics it needs and connects to them if needed, leaving the connection open until the metadata is refreshed, it seems to close the connection until it needs to send something to it again.

So to recap: if you want to send something to a certain topic, it checks if it knows the leaders (if it doesn't it gets them) then it checks whether it is connected to the leader of the partition it wants to send to (connects if it isn't) then it finally sends.

So, it maintains the minimum number of connections needed.

@eapache
Copy link
Contributor Author

eapache commented Sep 17, 2013

@sirupsen

I'm not sure why this could happen. The only thing I can think of is that the far end went away, and so the TCP stack is blocking trying retransmissions, but assuming the cluster is still up then even if it bounced it should send a RST immediately. If you can reproduce, take a traffic dump and see what shows up.

Alternatively, it could be some peculiar deadlock in the library, I don't know how you'd debug that.

@eapache
Copy link
Contributor Author

eapache commented Sep 17, 2013

@bouk Not quite minimal, because if I get a leader, then use it, then no longer need it that connection never gets closed? Or does it close all open connections every time it refreshes metadata?

Also curious: Sarama has maintained a fairly clean distinction between the Client object doing broker management, and the Producer/Consumer objects doing actual send/receives. Does the Scala library have the same separation, or do the Producer/Consumer classes have a bit more "say" in how brokers are managed, when connections are closed etc?

@sirupsen
Copy link
Contributor

@eapache We checked the Kafka log, and we could see lines like "lost connection to [..]", where [..] was the ip of the servers the producer ran on. Have you run into any behavior that would leave Kafka killing stale connections? It seems like weird behavior to me.

@bouk
Copy link

bouk commented Sep 17, 2013

It closes all connections of a topic each time that topic is refreshed. It refreshes topic by topic, or multiple at once if you send multiple messages at once (which sarama doesn't support yet I think).

It seems the scala library does something else for producers and consumers, I'd have to dig a bit deeper into how Kafka works to understand how the consumer manages the connections. The producer does as I described though

@eapache
Copy link
Contributor Author

eapache commented Sep 17, 2013

@sirupsen Don't know why Kafka would kill stale connections. TCP doesn't do keep-alives by default (but it can), so I guess it's possible that's one of their undocumented features is to kill apparently stale connections. Sarama should receive a RST and be able to safely reconnect in that case though.

@eapache
Copy link
Contributor Author

eapache commented Sep 17, 2013

@bouk If I have two producers on different topics that happen to go to the same broker, does that end up as two connections in the Scala client then? One of the benefits of Sarama's current design is that the two described producers will share a connection. However, this means that if one of them closes all its connections to refresh metadata, it would drop the connection out from under the other producer.

@eapache
Copy link
Contributor Author

eapache commented Sep 17, 2013

@sirupsen Kafka config page mentions "producer.purgatory.purge.interval.requests" and "fetch.purgatory.purge.interval.requests" so maybe it does something weird to handle misbehaving clients? There isn't a lot of detail on the documentation page.

@bouk
Copy link

bouk commented Sep 17, 2013

It does keep one connection per broker.

Bouke

On Tue, Sep 17, 2013 at 3:47 PM, Evan Huus [email protected] wrote:

@bouk https://github.com/bouk If I have two producers on different
topics that happen to go to the same broker, does that end up as two
connections in the Scala client then? One of the benefits of Sarama's
current design is that the two described producers will share a connection.
However, this means that if one of them closes all its connections to
refresh metadata, it would drop the connection out from under the other
producer.


Reply to this email directly or view it on GitHubhttps://github.com//issues/15#issuecomment-24589070
.

@bouk
Copy link

bouk commented Sep 17, 2013

The consumers in the scala library are zookeeper based, so they do something else entirely

@bouk
Copy link

bouk commented Sep 18, 2013

I implemented some of the changes bouk@0016f3d

Note that a metadata request currently just stops when it can't connect to any of the seed brokers, I think that should be replaced by a 'back off' and retry just like when a leader is being elected. That would require an API change because it would need a new config for the client which says how long it should back off (or should we just make backing for election and waiting for a seed broker the same?)

@eapache
Copy link
Contributor Author

eapache commented Sep 18, 2013

The great thing about go is that a new config doesn't count as a breaking API change since unspecified fields in a struct literal default to zero.

However, with the current changes I feel like we're just exchanging one set of corner-case issues for another. I don't see a lot of actual benefit I'm afraid.

@bouk
Copy link

bouk commented Sep 18, 2013

@eapache what do you mean by "I feel like we're just exchanging one set of corner-case issues for another."? I feel like copying the 'reference' behavior is probably the best strategy.

Note that my code currently hangs on the tests, this is because they expect that the client opens a connection to every broker it finds, which it does not

@eapache
Copy link
Contributor Author

eapache commented Sep 18, 2013

@bouk The current sarama design has a set of cases where it doesn't behave well. The scala client's design has a different set of cases where it doesn't behave well. We can certainly mine the scala client for ideas, but blindly copying their design just means we're exchanging the issues with our design for the issues with their design. If we're going to do that, we need some sort of argument for why their issues are somehow "better" than the ones we've already got, otherwise it's just code churn for no reason.

Ideally we can look at both versions and design something new that behaves well in all of the cases we've come up with, though that is obviously more work. I suspect such a design would involve explicitly reference-counting the brokers.

@sirupsen
Copy link
Contributor

@eapache Which issues do you think there are with the official approach?

@eapache
Copy link
Contributor Author

eapache commented Sep 18, 2013

@bouk
Copy link

bouk commented Sep 18, 2013

An issue I see with this design: if the subset of brokers provided by the user goes down, it fails to get metadata even if other connections are still up. Ideally the user should only have to provide two or three brokers for bootstrap, not the entire (potentially large) cluster.

Sure, you could make it check the other known brokers but I think it's best to just go with the suggested behavior.

@bouk If I have two producers on different topics that happen to go to the same broker, does that end up as two connections in the Scala client then? One of the benefits of Sarama's current design is that the two described producers will share a connection. However, this means that if one of them closes all its connections to refresh metadata, it would drop the connection out from under the other producer.

Yep that's a valid issue and I'm sure the scala lib doesn't just drop it, I'll have to consult it some more

@eapache
Copy link
Contributor Author

eapache commented Sep 18, 2013

Sure, you could make it check the other known brokers but I think it's best to just go with the suggested behaviour.

Why? What benefit in behaviour or complexity or speed or ... does it provide?

Also I don't really see the scala lib as representing "suggested" behaviour. It's a reference implementation, sure, but the protocol supports half a dozen tricks it doesn't even make use of, so I want to stay away from just blindly copying it.

@bouk
Copy link

bouk commented Sep 19, 2013

The scala lib makes sure that a broker doesn't get closed while messages are being sent with a lock, just like sarama does already so that's OK

burke added a commit that referenced this issue Dec 18, 2013
This is a partial workaround for #15. Really, broker management should
be completely rearchitected, but today is not the day for that.
@eapache
Copy link
Contributor Author

eapache commented Feb 24, 2015

This issue is rather stale, and a lot has changed, so here's an update:

Several of the issues raised were fixed long ago. The following are no longer problems:

  • "Once we've had an address fail, we discard it forever."
  • "the client's leader function shouldn't guarantee that the broker it returns is connected, only connecting."
  • "User-provided addresses should be managed separately"

Several of the issues are low-priority enhancements that should probably be split into their own tickets:

The only remaining non-trivial design question that is still open here is regarding how we bounce broker connections, and I went into more depth on that in the following comment: #23 (comment)

I'm leaving this ticket open to track that specific issue (specifically: the semantics of disconnectBroker are annoying, and whatever replaces it should be public). The other issues can be split out if anybody actually cares enough about them to do it.

@eapache
Copy link
Contributor Author

eapache commented Mar 17, 2015

Lazy broker connections have been implemented, as has the sane bouncing of broker connections. disconnectBroker still exists but it is now entirely internal to the client, and isn't a problem child anymore. This ticket can be closed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants