-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce new load manager API #303
Changes from 1 commit
1c46f5b
bbdf123
1f85e00
713ff38
5e343c9
4f97664
b7ec533
81b4725
edf6f4e
925ffca
8ae61c1
9b5358b
653a853
2203768
db12618
0d4e74d
aa2d580
358d1ff
1ea109e
31d60cb
1410008
92028f9
ba9c032
348aba3
bf79da2
1efe869
dbca10a
96c0d86
67b1fe5
9e39cc7
9b2e59c
403ee01
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -137,6 +137,14 @@ if [ "$COMMAND" == "produce" ]; then | |
exec $JAVA $OPTS com.yahoo.pulsar.testclient.PerformanceProducer --conf-file $PULSAR_PERFTEST_CONF "$@" | ||
elif [ "$COMMAND" == "consume" ]; then | ||
exec $JAVA $OPTS com.yahoo.pulsar.testclient.PerformanceConsumer --conf-file $PULSAR_PERFTEST_CONF "$@" | ||
elif [ "$COMMAND" == "monitor" ]; then | ||
exec $JAVA $OPTS com.yahoo.pulsar.testclient.BrokerMonitor "$@" | ||
elif [ "$COMMAND" == "new-monitor" ]; then | ||
exec $JAVA $OPTS com.yahoo.pulsar.testclient.NewBrokerMonitor "$@" | ||
elif [ "$COMMAND" == "simulation-server" ]; then | ||
exec $JAVA $OPTS com.yahoo.pulsar.testclient.LoadSimulationServer "$@" | ||
elif [ "$COMMAND" == "simulation-controller" ]; then | ||
exec $JAVA $OPTS com.yahoo.pulsar.testclient.LoadSimulationController "$@" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. also update the |
||
elif [ "$COMMAND" == "help" ]; then | ||
pulsar_help; | ||
else | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -293,3 +293,30 @@ keepAliveIntervalSeconds=30 | |
|
||
# How often broker checks for inactive topics to be deleted (topics with no subscriptions and no one connected) | ||
brokerServicePurgeInactiveFrequencyInSeconds=60 | ||
|
||
# Number of samples to use for short term time window | ||
numShortSamples=50 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Group all the settings by using a common prefix. eg: something like |
||
|
||
# Number of samples to use for long term time window | ||
numLongSamples=1000 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. try not to abbreviate too much the config variable names. For example this could be called |
||
|
||
# How often in seconds to update the broker data | ||
brokerDataUpdateIntervalSeconds=60 | ||
|
||
# How often in seconds to update the bundle data | ||
bundleDataUpdateIntervalSeconds=60 | ||
|
||
# Default throughput in to assume for new bundles | ||
defaultMsgThroughputIn=50000 | ||
|
||
# Default throughput out to assume for new bundles | ||
defaultMsgThroughputOut=50000 | ||
|
||
# Default message rate in to assume for new bundles | ||
defaultMsgRateIn=50 | ||
|
||
# Default message rate out to assume for new bundles | ||
defaultMsgRateOut=50 | ||
|
||
# Name of load manager to use | ||
loadManagerName=SimpleLoadManager | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use full class name |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,7 +30,7 @@ | |
* Pulsar service configuration object. | ||
* | ||
*/ | ||
public class ServiceConfiguration implements PulsarConfiguration{ | ||
public class ServiceConfiguration implements PulsarConfiguration { | ||
|
||
/***** --- pulsar configuration --- ****/ | ||
// Zookeeper quorum connection string | ||
|
@@ -196,10 +196,14 @@ public class ServiceConfiguration implements PulsarConfiguration{ | |
private boolean loadBalancerEnabled = false; | ||
// load placement strategy | ||
private String loadBalancerPlacementStrategy = "weightedRandomSelection"; // weighted random selection | ||
// load placement secondary strategy (used to silently test an alternate strategy) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you missed to remove this line |
||
private String loadBalancerSecondaryStrategy = null; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we really need |
||
// are all bundle placement operations forwarded to a lead broker | ||
private boolean loadBalancerIsCentralized = false; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we don't need this field. right now, we derive |
||
// Percentage of change to trigger load report update | ||
private int loadBalancerReportUpdateThresholdPercentage = 10; | ||
// maximum interval to update load report | ||
private int loadBalancerReportUpdateMaxIntervalMinutes = 15; | ||
private int loadBalancerReportUpdateMaxIntervalMinutes = 1; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. By lowering this interval to 1min, every broker is going to update its own load report every 1min, even if there were no substantial changes in the traffic. Whenever 1 broker updates the z-node, all the other broker will get the watch and reload that load report and redo the ranking. That could be a bit expensive and it was the reason to wait up to 15min to update, in the case the load doesn't change more than X% There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This makes sense, I will revert the default back to 15 minutes. |
||
// Frequency of report to collect | ||
private int loadBalancerHostUsageCheckIntervalMinutes = 1; | ||
// Load shedding interval. Broker periodically checks whether some traffic | ||
|
@@ -724,6 +728,22 @@ public String getLoadBalancerPlacementStrategy() { | |
return this.loadBalancerPlacementStrategy; | ||
} | ||
|
||
public void setLoadBalancerSecondaryStrategy(String secondaryStrategy) { | ||
this.loadBalancerSecondaryStrategy = secondaryStrategy; | ||
} | ||
|
||
public String getLoadBalancerSecondaryStrategy() { | ||
return this.loadBalancerSecondaryStrategy; | ||
} | ||
|
||
public void setLoadBalancerIsCentralized(boolean isCentralized) { | ||
this.loadBalancerIsCentralized = isCentralized; | ||
} | ||
|
||
public boolean getLoadBalancerIsCentralized() { | ||
return this.loadBalancerIsCentralized; | ||
} | ||
|
||
public int getLoadBalancerReportUpdateThresholdPercentage() { | ||
return loadBalancerReportUpdateThresholdPercentage; | ||
} | ||
|
@@ -941,4 +961,117 @@ public String getReplicatorPrefix() { | |
public void setReplicatorPrefix(String replicatorPrefix) { | ||
this.replicatorPrefix = replicatorPrefix; | ||
} | ||
|
||
|
||
// Configurations for new load manager API | ||
|
||
// Number of samples to use for short term time window | ||
private int numShortSamples = 50; | ||
|
||
// Number of samples to use for long term time window | ||
private int numLongSamples = 1000; | ||
|
||
// How often in seconds to update the broker data | ||
private long brokerDataUpdateIntervalSeconds = 60; | ||
|
||
// How often in seconds to update the bundle data | ||
private long bundleDataUpdateIntervalSeconds = 60; | ||
|
||
// Default throughput in to assume for new bundles | ||
private double defaultMsgThroughputIn = 50000; | ||
|
||
// Default throughput out to assume for new bundles | ||
private double defaultMsgThroughputOut = 50000; | ||
|
||
// Default message rate in to assume for new bundles | ||
private double defaultMsgRateIn = 50; | ||
|
||
// Default message rate out to assume for new bundles | ||
private double defaultMsgRateOut = 50; | ||
|
||
// Name of load manager to use | ||
private String loadManagerName = "SimpleLoadManager"; | ||
|
||
// Name of placement strategy to use for new loadbalancer API. | ||
private String newPlacementStrategyName = "LeastLongTermMessageRate"; | ||
|
||
public int getNumShortSamples() { | ||
return numShortSamples; | ||
} | ||
|
||
public void setNumShortSamples(int numShortSamples) { | ||
this.numShortSamples = numShortSamples; | ||
} | ||
|
||
public int getNumLongSamples() { | ||
return numLongSamples; | ||
} | ||
|
||
public void setNumLongSamples(int numLongSamples) { | ||
this.numLongSamples = numLongSamples; | ||
} | ||
|
||
public long getBrokerDataUpdateIntervalSeconds() { | ||
return brokerDataUpdateIntervalSeconds; | ||
} | ||
|
||
public void setBrokerDataUpdateIntervalSeconds(long brokerDataUpdateIntervalSeconds) { | ||
this.brokerDataUpdateIntervalSeconds = brokerDataUpdateIntervalSeconds; | ||
} | ||
|
||
public long getBundleDataUpdateIntervalSeconds() { | ||
return bundleDataUpdateIntervalSeconds; | ||
} | ||
|
||
public void setBundleDataUpdateIntervalSeconds(long bundleDataUpdateIntervalSeconds) { | ||
this.bundleDataUpdateIntervalSeconds = bundleDataUpdateIntervalSeconds; | ||
} | ||
|
||
public double getDefaultMsgThroughputIn() { | ||
return defaultMsgThroughputIn; | ||
} | ||
|
||
public void setDefaultMsgThroughputIn(double defaultMsgThroughputIn) { | ||
this.defaultMsgThroughputIn = defaultMsgThroughputIn; | ||
} | ||
|
||
public double getDefaultMsgThroughputOut() { | ||
return defaultMsgThroughputOut; | ||
} | ||
|
||
public void setDefaultMsgThroughputOut(double defaultMsgThroughputOut) { | ||
this.defaultMsgThroughputOut = defaultMsgThroughputOut; | ||
} | ||
|
||
public double getDefaultMsgRateIn() { | ||
return defaultMsgRateIn; | ||
} | ||
|
||
public void setDefaultMsgRateIn(double defaultMsgRateIn) { | ||
this.defaultMsgRateIn = defaultMsgRateIn; | ||
} | ||
|
||
public double getDefaultMsgRateOut() { | ||
return defaultMsgRateOut; | ||
} | ||
|
||
public void setDefaultMsgRateOut(double defaultMsgRateOut) { | ||
this.defaultMsgRateOut = defaultMsgRateOut; | ||
} | ||
|
||
public String getLoadManagerName() { | ||
return loadManagerName; | ||
} | ||
|
||
public void setLoadManagerName(String loadManagerName) { | ||
this.loadManagerName = loadManagerName; | ||
} | ||
|
||
public String getNewPlacementStrategyName() { | ||
return newPlacementStrategyName; | ||
} | ||
|
||
public void setNewPlacementStrategyName(String newPlacementStrategyName) { | ||
this.newPlacementStrategyName = newPlacementStrategyName; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
package com.yahoo.pulsar.broker; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it will require to have copy-right License here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure why this diff is not outdated: the copy-right is present if you look at Files Changed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, we can see now. |
||
|
||
import java.util.Map; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
|
||
/** | ||
* Data class containing three components comprising all the data available for the leader broker about other brokers: | ||
* - The local broker data which is written to ZooKeeper by each individual broker (LocalBrokerData). | ||
* - The time average bundle data which is written to ZooKeeper by the leader broker (TimeAverageBrokerData). | ||
* - The preallocated bundles which are not written to ZooKeeper but are maintained by the leader broker | ||
* (Map<String, BundleData>). | ||
*/ | ||
public class BrokerData { | ||
private LocalBrokerData localData; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. While that is true, I think pushing the new fields into There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok |
||
private TimeAverageBrokerData timeAverageData; | ||
private Map<String, BundleData> preallocatedBundleData; | ||
|
||
/** | ||
* Initialize this BrokerData using the most recent local data. | ||
* @param localData The data local for the broker. | ||
*/ | ||
public BrokerData(final LocalBrokerData localData) { | ||
this.localData = localData; | ||
timeAverageData = new TimeAverageBrokerData(); | ||
preallocatedBundleData = new ConcurrentHashMap<>(); | ||
} | ||
|
||
public LocalBrokerData getLocalData() { | ||
return localData; | ||
} | ||
|
||
public void setLocalData(LocalBrokerData localData) { | ||
this.localData = localData; | ||
} | ||
|
||
public TimeAverageBrokerData getTimeAverageData() { | ||
return timeAverageData; | ||
} | ||
|
||
public void setTimeAverageData(TimeAverageBrokerData timeAverageData) { | ||
this.timeAverageData = timeAverageData; | ||
} | ||
|
||
public Map<String, BundleData> getPreallocatedBundleData() { | ||
return preallocatedBundleData; | ||
} | ||
|
||
public void setPreallocatedBundleData(Map<String, BundleData> preallocatedBundleData) { | ||
this.preallocatedBundleData = preallocatedBundleData; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
package com.yahoo.pulsar.broker; | ||
|
||
import com.yahoo.pulsar.common.policies.data.loadbalancer.NamespaceBundleStats; | ||
|
||
/** | ||
* Data class comprising the short term and long term historical data for this bundle. | ||
*/ | ||
public class BundleData extends JSONWritable { | ||
// Short term data for this bundle. The time frame of this data is determined by the number of short term samples | ||
// and the bundle update period. | ||
private TimeAverageMessageData shortTermData; | ||
|
||
// Long term data for this bundle. The time frame of this data is determined by the number of long term samples | ||
// and the bundle update period. | ||
private TimeAverageMessageData longTermData; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since BundleData is only maintained in memory by the leader, we will loose that info when leader restarts. Isn't it useful to have bundle specific historic data preserved across restart? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is maintained on ZooKeeper also in |
||
|
||
// For JSON only. | ||
public BundleData(){} | ||
|
||
/** | ||
* Initialize the bundle data. | ||
* @param numShortSamples Number of short term samples to use. | ||
* @param numLongSamples Number of long term samples to use. | ||
*/ | ||
public BundleData(final int numShortSamples, final int numLongSamples) { | ||
shortTermData = new TimeAverageMessageData(numShortSamples); | ||
longTermData = new TimeAverageMessageData(numLongSamples); | ||
} | ||
|
||
/** | ||
* Initialize this bundle data and have its histories default to the given stats before the first sample is | ||
* received. | ||
* @param numShortSamples Number of short term samples to use. | ||
* @param numLongSamples Number of long term samples to use. | ||
* @param defaultStats The stats to default to before the first sample is received. | ||
*/ | ||
public BundleData(final int numShortSamples, final int numLongSamples, final NamespaceBundleStats defaultStats) { | ||
shortTermData = new TimeAverageMessageData(numShortSamples, defaultStats); | ||
longTermData = new TimeAverageMessageData(numLongSamples, defaultStats); | ||
} | ||
|
||
/** | ||
* Update the historical data for this bundle. | ||
* @param newSample The bundle stats to update this data with. | ||
*/ | ||
public void update(final NamespaceBundleStats newSample) { | ||
shortTermData.update(newSample); | ||
longTermData.update(newSample); | ||
} | ||
|
||
public TimeAverageMessageData getShortTermData() { | ||
return shortTermData; | ||
} | ||
|
||
public void setShortTermData(TimeAverageMessageData shortTermData) { | ||
this.shortTermData = shortTermData; | ||
} | ||
|
||
public TimeAverageMessageData getLongTermData() { | ||
return longTermData; | ||
} | ||
|
||
public void setLongTermData(TimeAverageMessageData longTermData) { | ||
this.longTermData = longTermData; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
package com.yahoo.pulsar.broker; | ||
|
||
import com.fasterxml.jackson.annotation.JsonIgnore; | ||
import com.fasterxml.jackson.core.JsonProcessingException; | ||
import com.yahoo.pulsar.common.util.ObjectMapperFactory; | ||
|
||
/** | ||
* Helper class used to conveniently convert a data class to a JSON. | ||
*/ | ||
public class JSONWritable { | ||
|
||
/** | ||
* Get the JSON of this object as a byte[]. | ||
* @return A byte[] of this object's JSON. | ||
* @throws JsonProcessingException | ||
*/ | ||
@JsonIgnore | ||
public byte[] getJsonBytes() throws JsonProcessingException { | ||
return ObjectMapperFactory.getThreadLocal().writeValueAsBytes(this); | ||
} | ||
|
||
/** | ||
* Get the JSON of this object as a String. | ||
* @return A String of this object's JSON. | ||
* @throws JsonProcessingException | ||
*/ | ||
@JsonIgnore | ||
public String getJsonString() throws JsonProcessingException { | ||
return ObjectMapperFactory.getThreadLocal().writeValueAsString(this); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the difference between
BrokerMonitor
andNewBrokerMonitor
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The difference is that one monitors data when
SimpleLoadManagerImpl
is used and the other monitors data whenModularLoadManagerImpl
is used. I have renamed them toSimpleLoadManagerBrokerMonitor
andModularLoadManagerBrokerMonitor
to better reflect the difference.