-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce new load manager API #303
Conversation
Thank you for submitting this pull request, however I do not see a valid CLA on file for you. Before we can merge this request please visit https://yahoocla.herokuapp.com/ and agree to the terms. Thanks! 😄 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just added few comments on non-substantial aspects. Will go through it more in deep later.
conf/broker.conf
Outdated
@@ -293,3 +293,30 @@ keepAliveIntervalSeconds=30 | |||
|
|||
# How often broker checks for inactive topics to be deleted (topics with no subscriptions and no one connected) | |||
brokerServicePurgeInactiveFrequencyInSeconds=60 | |||
|
|||
# Number of samples to use for short term time window | |||
numShortSamples=50 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Group all the settings by using a common prefix. eg: something like brokerLoadManagedXYZ...
conf/broker.conf
Outdated
defaultMsgRateOut=50 | ||
|
||
# Name of load manager to use | ||
loadManagerName=SimpleLoadManager |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use full class name
conf/broker.conf
Outdated
numShortSamples=50 | ||
|
||
# Number of samples to use for long term time window | ||
numLongSamples=1000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
try not to abbreviate too much the config variable names. For example this could be called brokerLoadManagerNumberOfSamplesLongTermWindow=1000
// Percentage of change to trigger load report update | ||
private int loadBalancerReportUpdateThresholdPercentage = 10; | ||
// maximum interval to update load report | ||
private int loadBalancerReportUpdateMaxIntervalMinutes = 15; | ||
private int loadBalancerReportUpdateMaxIntervalMinutes = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By lowering this interval to 1min, every broker is going to update its own load report every 1min, even if there were no substantial changes in the traffic.
Whenever 1 broker updates the z-node, all the other broker will get the watch and reload that load report and redo the ranking. That could be a bit expensive and it was the reason to wait up to 15min to update, in the case the load doesn't change more than X%
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes sense, I will revert the default back to 15 minutes.
import java.util.concurrent.locks.Condition; | ||
import java.util.concurrent.locks.ReentrantLock; | ||
import java.util.function.Supplier; | ||
|
||
import org.apache.bookkeeper.mledger.ManagedLedgerFactory; | ||
import org.apache.bookkeeper.util.OrderedSafeExecutor; | ||
import org.apache.zookeeper.ZooKeeper; | ||
import org.apache.bookkeeper.util.ZkUtils; | ||
import org.apache.zookeeper.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use Eclipse formatter profile and do not group the includes
} catch (KeeperException.NodeExistsException e) { | ||
// ignore the exception, node might be present already | ||
} | ||
} | ||
|
||
Thread.sleep(5000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this sleep needed?
LoadReport loadReport = null; | ||
Thread.sleep(5000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here
"Not enough of primaries [{}] available for namespace - [{}], " | ||
+ "adding shared [{}] as possible candidate owners", | ||
primaries.size(), namespace.toString(), shared.size()); | ||
"Not enough of primaries [{}] available for namespace - [{}], " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use the eclipse formatter to minimize formatting diffs
bin/pulsar-perf
Outdated
@@ -137,6 +137,14 @@ if [ "$COMMAND" == "produce" ]; then | |||
exec $JAVA $OPTS com.yahoo.pulsar.testclient.PerformanceProducer --conf-file $PULSAR_PERFTEST_CONF "$@" | |||
elif [ "$COMMAND" == "consume" ]; then | |||
exec $JAVA $OPTS com.yahoo.pulsar.testclient.PerformanceConsumer --conf-file $PULSAR_PERFTEST_CONF "$@" | |||
elif [ "$COMMAND" == "monitor" ]; then | |||
exec $JAVA $OPTS com.yahoo.pulsar.testclient.BrokerMonitor "$@" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the difference between BrokerMonitor
and NewBrokerMonitor
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The difference is that one monitors data when SimpleLoadManagerImpl
is used and the other monitors data when ModularLoadManagerImpl
is used. I have renamed them to SimpleLoadManagerBrokerMonitor
and ModularLoadManagerBrokerMonitor
to better reflect the difference.
bin/pulsar-perf
Outdated
elif [ "$COMMAND" == "simulation-server" ]; then | ||
exec $JAVA $OPTS com.yahoo.pulsar.testclient.LoadSimulationServer "$@" | ||
elif [ "$COMMAND" == "simulation-controller" ]; then | ||
exec $JAVA $OPTS com.yahoo.pulsar.testclient.LoadSimulationController "$@" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also update the pulsar_help()
above to add the new commands in the help message
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks very nice. Just few comments and formatting (use Eclipse with the formatting profile from src/formatter.xml
).
Once this is merged, then it would also be good to have a document in the "Internal Docs" section to explain the mechanism behind the load manager and, even nicer, to have some visualization of the load distribution comparing old vs new load manager.
private double loadManagerDefaultMessageRateOut = 50; | ||
|
||
// Name of load manager to use | ||
private String loadManagerClassName = "com.yahoo.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use SimpleLoadManagerImpl.class.getName()
, so that it gets updated if the class is renamed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a bit of an issue with this, in that pulsar-broker depends on pulsar-broker-common and maven doesn't allow circular dependencies. Without package reorganization I'm not sure it can be done this way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, no problem then
* ZooKeeper but are maintained by the leader broker (Map<String, BundleData>). | ||
*/ | ||
public class BrokerData { | ||
private LocalBrokerData localData; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Formatting seems a bit off, it should be 4 spaces and no tabs
@@ -201,6 +208,28 @@ public void close() throws PulsarServerException { | |||
} | |||
} | |||
|
|||
private class LoadManagerWatcher implements Watcher { | |||
public void process(final WatchedEvent event) { | |||
new Thread(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's preferable to reuse an existing executor instead of spawing new threads
public void process(final WatchedEvent event) { | ||
new Thread(() -> { | ||
try { | ||
LOG.info("Attempting to change load manager"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ServiceConfiguration
already support "dynamic" variables that can be overridded in ZK. You can register to be notified whenever a particular field is updated. Please check with @rdhabalia for more info.
try { | ||
final ServiceConfiguration conf = pulsar.getConfiguration(); | ||
final Class<?> loadManagerClass = Class.forName(conf.getLoadManagerClassName()); | ||
// Assume there is a constructor with one argument of PulsarService. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of assuming a particular constructor, a common practice with interfaces and reflection is to assume default constructor and then enforce an init method in the interface:
void initialize(PulsarService service);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That sounds fine, the only problem I would have with that is that I would lose the ability to finalize many fields, but that probably is not a big deal so I will change it
private Producer getNewProducer() throws Exception { | ||
while (true) { | ||
try { | ||
return client.createProducerAsync(topic, producerConf).get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this could be just
return client.createProducer(topic, producerConf);
if (!stop.get()) { | ||
// The Producer failed due to an exception: attempt to get | ||
// another producer. | ||
producer = getNewProducer(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once you have a producer, it's good forever. Even if you get an except, the producer instance is still valid and it will recover by itself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, it seems that for whatever reason, I am experiencing issues related to netty/direct memory if I do not attempt to make a new producer:
io.netty.util.IllegalReferenceCountException: refCnt: 0, increment: 1
at io.netty.buffer.AbstractReferenceCountedByteBuf.retain(AbstractReferenceCountedByteBuf.java:63)
at com.yahoo.pulsar.common.compression.CompressionCodecNone.encode(CompressionCodecNone.java:27)
at com.yahoo.pulsar.client.impl.BatchMessageContainer.getCompressedBatchMetadataAndPayload(BatchMessageContainer.java:107)
at com.yahoo.pulsar.client.impl.ProducerImpl.batchMessageAndSend(ProducerImpl.java:1019)
at com.yahoo.pulsar.client.impl.ProducerImpl.doBatchSendAndAdd(ProducerImpl.java:293)
at com.yahoo.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java:237)
at com.yahoo.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java:134)
at com.yahoo.pulsar.client.impl.ProducerBase.sendAsync(ProducerBase.java:47)
at com.yahoo.pulsar.testclient.LoadSimulationServer$TradeUnit.start(LoadSimulationServer.java:112)
at com.yahoo.pulsar.testclient.LoadSimulationServer.lambda$handle$5(LoadSimulationServer.java:212)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:745)
As a result, I can end up losing a bunch of bundles in the simulation. I will leave the get new producer logic for now and try to find a better solution later
pulsar-testclient/pom.xml
Outdated
@@ -39,11 +39,26 @@ | |||
<scope>test</scope> | |||
</dependency> | |||
<dependency> | |||
<groupId>org.apache.zookeeper</groupId> | |||
<artifactId>zookeeper</artifactId> | |||
<version>3.4.8</version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't specify artifact version outside <dependencyManagement>
section in the parent pom. In this case just omit the version.
@@ -159,8 +160,7 @@ private ResourceUsage getMemUsage() { | |||
} | |||
|
|||
private boolean isPhysicalNic(Path path) { | |||
try { | |||
path = Files.isSymbolicLink(path) ? Files.readSymbolicLink(path) : path; | |||
path = Files.isSymbolicLink(path) ? path.toAbsolutePath() : path; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why removing try/catch here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
readSymbolicLink
can throw IOException
, but since it was changed, there is no possibility of a checked exception being thrown. I could change it to catch Exception
instead, though this catches more exceptions than in the original code.
// select one of them at the end. | ||
for (String broker : candidates) { | ||
final double score = getScore(loadData.getBrokerData().get(broker), conf); | ||
log.info("{} got score {}", broker, score); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this line be more like debug?
bin/pulsar-perf
Outdated
consume Run a consumer | ||
simple-monitor Continuously receive broker data when using SimpleLoadManagerImpl | ||
modular-monitor Continuously receive broker data when using ModularLoadManagerImpl | ||
simulation-server Run a simulation server acting as a Pulsar client |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we name it as simulation-client
?
@@ -0,0 +1,52 @@ | |||
package com.yahoo.pulsar.broker; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it will require to have copy-right License here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure why this diff is not outdated: the copy-right is present if you look at Files Changed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, we can see now.
final String newLoadManagerName = | ||
new String(getZkClient().getData(DYNAMIC_LOAD_MANAGER_ZPATH, this, null)); | ||
|
||
config.setLoadManagerClassName(newLoadManagerName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (lm instanceof SimpleLoadManagerImpl) { | ||
return ((SimpleLoadManagerImpl) lm).getResourceAvailabilityFor(ns).asMap(); | ||
} else { | ||
return Collections.emptyMap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if we just return null rather creating new collections.
@@ -79,6 +83,11 @@ | |||
void doNamespaceBundleSplit() throws Exception; | |||
|
|||
/** | |||
* Determine the broker root. | |||
*/ | |||
String getBrokerRoot(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we name it as : getBrokerRootName()
as we are returning name rather root node?
} | ||
|
||
// Form a score for a broker using its preallocated bundle data and time | ||
// average data. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here again, is it possible to define how do we calculate score?
// wireless nics don't report speed, ignore them. | ||
return false; | ||
} | ||
path = Files.isSymbolicLink(path) ? path.toAbsolutePath() : path; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't we read readSymbolicLink
if file isSymbolicLink
?
import com.yahoo.pulsar.zookeeper.ZooKeeperDataCache; | ||
|
||
public class ModularLoadManagerImpl implements ModularLoadManager, ZooKeeperCacheListener<LocalBrokerData> { | ||
public static final String LOADBALANCE_BROKERS_ROOT = "/loadbalance/new-brokers"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does it mean we don't write into /loadbalance/brokers
anymore once we enable this load-manager? because while migration old broker still tries to get broker's url from path /loadbalance/brokers
while redirection of lookup request. and if this node will not be present then lookup may fail?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was resolved by having NamespaceService
call getBrokerRoot
instead of using the hardcoded path
EDIT: I see now there is little necessity to separate the two broker roots and have removed getBrokerRoot
in favor of simply using /loadbalance/brokers
return broker; | ||
} | ||
|
||
private void policyFilter(final ServiceUnitId serviceUnit) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the same logic compare to SimpleLoadManagerImpl.getFinalCandidates()
to filter out candidate if yes then should we put it into some common place to avoid for any new loadManager?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is very similar, but at the moment the different in the APIs (Multimap<Long, ResourceUnit>
vs. Set<String>
) complicate refactoring of this portion of the code significantly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had some time today and decided to go ahead and share the code, along with other duplicate code. It makes a somewhat invasive change to SimpleLoadManagerImpl
which could possibly impact performance: If there are major concerns there I will revert by to the previous version.
|
||
final Set<String> oldBundles = lastLoadReport.getBundles(); | ||
final Set<String> newBundles = loadReport.getBundles(); | ||
final Set<String> bundleGains = new HashSet<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible to move this collection to class level and reuse it? as load-report run every x minutes and it may lil bit contribute in gc?
// the ranks are bounded. Otherwise (as is the case in LOADBALANCER_STRATEGY_LEAST_MSG, the ranks are simply | ||
// the total message rate which is in the range [0,Infinity) so they are unbounded. The | ||
// "boundedness" affects how two ranks are compared to see which one is better | ||
boolean unboundedRanks = getLoadBalancerPlacementStrategy().equals(LOADBALANCER_STRATEGY_LEAST_MSG); | ||
synchronized (resourceUnitRankings) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since both the methods accessing this map is synchronized, can we remove this synchronization block?
// Long term data for this bundle. The time frame of this data is determined | ||
// by the number of long term samples | ||
// and the bundle update period. | ||
private TimeAverageMessageData longTermData; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since BundleData is only maintained in memory by the leader, we will loose that info when leader restarts. Isn't it useful to have bundle specific historic data preserved across restart?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is maintained on ZooKeeper also in /loadbalance/broker-data
(it is basically the replacement for resource quotas). It is updated to ZooKeeper via writeBundleDataOnZooKeeper
.
*/ | ||
@Override | ||
public void onUpdate(final String path, final LocalBrokerData data, final Stat stat) { | ||
scheduler.submit(this::updateAll); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this is invoked when load data is updated by every broker, shouldn't we update the data for that broker instead of all?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right, good catch.
EDIT: Actually, the local data is updated via writeBrokerDataOnZooKeeper
, which is called periodically by LoadReportUpdaterTask
. This updateAll
periodic invocation ensures that the load data is up to date; while we could delegate it only to the leader broker, this could complicate things a little bit because:
- The load manager does not know directly whether it is the leader broker.
- Since it is implemented as a watch we would have to make special stipulations based on whether the broker is the leader broker.
I think that, since updating invokes reads and not writes, it is not so bad for every broker to do it. It is actually similar to SimpleLoadManagerImpl
's behavior, since onUpdate
updates the rankings for it, for each broker.
* @return The name of the selected broker, as it appears on ZooKeeper. | ||
*/ | ||
@Override | ||
public synchronized String selectBrokerForAssignment(final ServiceUnitId serviceUnit) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need synchronized block here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Without the synchronized block, decisions will be made without the latest preallocated data. If many selections are processed within a short time frame, this could result in the least loaded broker receiving too many assignments. A weighted random procedure could counteract this, but it is a little less reliable and more complicated.
@@ -199,6 +199,10 @@ | |||
private boolean loadBalancerEnabled = false; | |||
// load placement strategy | |||
private String loadBalancerPlacementStrategy = "weightedRandomSelection"; // weighted random selection | |||
// load placement secondary strategy (used to silently test an alternate strategy) | |||
private String loadBalancerSecondaryStrategy = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we really need loadBalancerSecondaryStrategy
? can't we update loadBalancerPlacementStrategy
and dynamically keep changing the strategy?
|
||
// Name of load manager to use | ||
@FieldContext(dynamic = true) | ||
private String loadManagerClassName = "com.yahoo.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should place all the variables in the beginning and then getters/setters.
// load placement secondary strategy (used to silently test an alternate strategy) | ||
private String loadBalancerSecondaryStrategy = null; | ||
// are all bundle placement operations forwarded to a lead broker | ||
private boolean loadBalancerIsCentralized = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we don't need this field. right now, we derive loadBalancerIsCentralized
based on strategy. if strategy is leastLoadedServer
then loadBalancerIsCentralized
will be true else false. So, each LoadBalancer implements isCentralized
method.
So, as we already have loadBalancer implementation then LoadBalancerIml.isCentralized()
should have predefine value and it shouldn't be dynamic. right?
@@ -0,0 +1,52 @@ | |||
package com.yahoo.pulsar.broker; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, we can see now.
System.out.println("\nBroker Data for " + broker + ":"); | ||
System.out.println("---------------"); | ||
|
||
System.out.println("\nNum Topics: " + localBrokerData.getNumTopics()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we please replace System.out.print
with logger.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this strictly necessary? This is meant to feed continuous information in a pretty/human-readable way and the info lines are going to add a lot of clutter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's true, though the advantage of using a logger is that you can chose to change levels or save to a file with configuration. Also, every line will get the timestamp printed
private Set<String> bundles; | ||
|
||
// The bundles gained since the last invocation of update. | ||
private Set<String> lastBundleGains; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we use lastBundleGains
in loadManager?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In is not used in the load manager, but it is used in the monitor to alert users to recent bundle gains.
* which are not written to ZooKeeper but are maintained by the leader broker (Map<String, BundleData>). | ||
*/ | ||
public class BrokerData { | ||
private LocalBrokerData localData; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think LocalBrokerData
is a replacement of LoadReport so, can't we add more information into existing LoadReport
and use it? In that way we don't have to maintain two classes and any load-manager can deserialize it. is that correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While that is true, I think pushing the new fields into LoadReport
would add unnecessary clutter to the JSON for users of SimpleLoadManagerImpl
and vice-versa for ModularLoadManagerImpl
. I think they are different enough to keep separate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
Can we create a separate pull request to document the steps to enable/disable the new load manager? |
/ newUsage.directMemory.limit) | ||
: 0; | ||
? ((newUsage.directMemory.usage - oldUsage.directMemory.usage) * 100 | ||
/ newUsage.directMemory.limit) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
formatting. spaces
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to be formatted correctly according to our formatter
try { | ||
log.info("Attempting to change load manager"); | ||
final LoadManager newLoadManager = LoadManager.create(pulsar); | ||
log.info("Created load manager: {}", className); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we just have one info statement after creating load-manager.
@@ -199,6 +199,7 @@ | |||
private boolean loadBalancerEnabled = false; | |||
// load placement strategy | |||
private String loadBalancerPlacementStrategy = "weightedRandomSelection"; // weighted random selection | |||
// load placement secondary strategy (used to silently test an alternate strategy) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you missed to remove this line
bin/pulsar-perf
Outdated
produce Run a producer | ||
consume Run a consumer | ||
simple-monitor Continuously receive broker data when using SimpleLoadManagerImpl | ||
modular-monitor Continuously receive broker data when using ModularLoadManagerImpl |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit-picking: how about calling the 2 commands like :
monitor-simple-load-manager
monitor-modular-load-manager
Otherwise the adjective seems to be referred to the "monitor" itself.
Files.readAllBytes(path.resolve("speed")); | ||
return true; | ||
} catch (Exception e) { | ||
// wireless nics don't report speed, ignore them. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we still be skipping "virtual" NICs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We decided that, since virtual nics will not be able to resolve speed (?), we may as well let catch block deal with it. That way, we do not need to resolve symbolic links or get absolute paths.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you're 100% sure that it works, go ahead 😄 . Otherwise just pull back the if-not-virtual check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good effort to revive the load balancer Brad & Bobbey 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
…vert properly (apache#311) When `KafkaConsumer` send OffsetCommitRequest, the field `retentionTime` is always set to -1, and kafka server convert the retentionTime according to server config. this patch add convert logic the same as kafka server to avoid kop expire the wrong commit offset in `GroupMetadataManager.cleanupGroupMetadata` logic. test in `KafkaRequestHandlerTest.testOffsetCommitRequestRetentionMs`
* Upgarded rocksdb version to 6.29.4.1 * updated rocksdb version in LICENSE * Fixed rocksdb license version
Motivation
The current load balancer implementation has demonstrated poor performance in some scenarios, particularly when doing a cold restart or when the load reaches unexpected heights at some brokers. This PR attempts to first address the cold start issue.
Modifications
com.yahoo.pulsar.testclient.LoadSimulationServer
pulsar-perf simulation-server <options>
com.yahoo.pulsar.testclient.LoadSimulationController
pulsar-perf simulation-controller <options>
com.yahoo.pulsar.testclient.BrokerMonitor
pulsar-perf monitor <options>
com.yahoo.pulsar.testclient.NewBrokerMonitor
andpulsar-perf new-monitor <options>
com.yahoo.pulsar.broker.loadbalance.NewLoadManager
com.yahoo.pulsar.broker.loadbalance.impl.NewLoadManagerImpl
LoadManager
viacom.yahoo.pulsar.broker.loadbalance.impl.NewLoadManagerWrapper
com.yahoo.pulsar.broker.loadbalance.BrokerFilter
com.yahoo.pulsar.broker.loadbalance.NewPlacementStrategy
com.yahoo.pulsar.broker.loadbalance.impl.LeastLongTermMessageRate
com.yahoo.pulsar.broker.loadbalance.LoadSheddingStrategy
com.yahoo.pulsar.broker.loadbalance.impl.DeviationShedder
(untested, abstract)Result