Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce new load manager API #303

Merged
merged 32 commits into from
Mar 30, 2017
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
1c46f5b
Introduce new load manager API
bobbeyreese Mar 21, 2017
bbdf123
Fix formatting and style issues
bobbeyreese Mar 22, 2017
1f85e00
Fix imports
bobbeyreese Mar 22, 2017
713ff38
Add back renamed classes
bobbeyreese Mar 22, 2017
5e343c9
Add policies to ModularLoadManagerImpl
bobbeyreese Mar 22, 2017
4f97664
Make system resource usage more influential
bobbeyreese Mar 23, 2017
b7ec533
Fix reflective setting of load manager in tests
bobbeyreese Mar 23, 2017
81b4725
Conform with mockito syntax
bobbeyreese Mar 23, 2017
edf6f4e
Merge branch 'master' of https://github.com/yahoo/pulsar
bobbeyreese Mar 23, 2017
925ffca
Improve formatting and design
bobbeyreese Mar 24, 2017
8ae61c1
Revert to old simulation behavior, use reflective interface design
bobbeyreese Mar 24, 2017
9b5358b
Remove unnecessary async call
bobbeyreese Mar 24, 2017
653a853
Fix formatting
bobbeyreese Mar 24, 2017
2203768
Remove git merge remnant
bobbeyreese Mar 24, 2017
db12618
Merge branch 'master' of https://github.com/yahoo/pulsar
bobbeyreese Mar 27, 2017
0d4e74d
Refactor some classes and put loadManagerClassName in dynamic conf
bobbeyreese Mar 28, 2017
aa2d580
Delete LoadSimulationServer
bobbeyreese Mar 28, 2017
358d1ff
Clean API, share more code
bobbeyreese Mar 28, 2017
1ea109e
Remove redundant synchronized block, make correction to onUpdate
bobbeyreese Mar 28, 2017
31d60cb
Fix policy filter bug
bobbeyreese Mar 29, 2017
1410008
Remove availableActiveBrokers.get() call
bobbeyreese Mar 29, 2017
92028f9
Use connection pooling
bobbeyreese Mar 29, 2017
ba9c032
Remove unused code
bobbeyreese Mar 30, 2017
348aba3
Don't try reading the symlink and converting to absolute path
Mar 30, 2017
bf79da2
Merge branch 'master' of https://github.com/bobbeyreese/pulsar
Mar 30, 2017
1efe869
Remove redundant and unnecessary statments
bobbeyreese Mar 30, 2017
dbca10a
Merge branch 'master' of https://github.com/bobbeyreese/pulsar
bobbeyreese Mar 30, 2017
96c0d86
Add test back to see if path contains /virtual/
Mar 30, 2017
67b1fe5
Merge branch 'master' of https://github.com/bobbeyreese/pulsar
Mar 30, 2017
9e39cc7
Replace System.out statements with log.info
bobbeyreese Mar 30, 2017
9b2e59c
Merge branch 'master' of https://github.com/bobbeyreese/pulsar
bobbeyreese Mar 30, 2017
403ee01
Change %s to {}
bobbeyreese Mar 30, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions bin/pulsar-perf
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,13 @@ pulsar_help() {
cat <<EOF
Usage: pulsar <command>
where command is one of:
produce Run a producer
consume Run a consumer
produce Run a producer
consume Run a consumer
simple-monitor Continuously receive broker data when using SimpleLoadManagerImpl
modular-monitor Continuously receive broker data when using ModularLoadManagerImpl
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit-picking: how about calling the 2 commands like :

monitor-simple-load-manager
monitor-modular-load-manager

Otherwise the adjective seems to be referred to the "monitor" itself.

simulation-server Run a simulation server acting as a Pulsar client
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we name it as simulation-client?

simulation-controller Run a simulation controller to give commands to servers

help This help message

or command is the full name of a class with a defined main() method.
Expand Down Expand Up @@ -137,6 +142,14 @@ if [ "$COMMAND" == "produce" ]; then
exec $JAVA $OPTS com.yahoo.pulsar.testclient.PerformanceProducer --conf-file $PULSAR_PERFTEST_CONF "$@"
elif [ "$COMMAND" == "consume" ]; then
exec $JAVA $OPTS com.yahoo.pulsar.testclient.PerformanceConsumer --conf-file $PULSAR_PERFTEST_CONF "$@"
elif [ "$COMMAND" == "simple-monitor" ]; then
exec $JAVA $OPTS com.yahoo.pulsar.testclient.SimpleLoadManagerBrokerMonitor "$@"
elif [ "$COMMAND" == "modular-monitor" ]; then
exec $JAVA $OPTS com.yahoo.pulsar.testclient.ModularLoadManagerBrokerMonitor "$@"
elif [ "$COMMAND" == "simulation-server" ]; then
exec $JAVA $OPTS com.yahoo.pulsar.testclient.LoadSimulationServer "$@"
elif [ "$COMMAND" == "simulation-controller" ]; then
exec $JAVA $OPTS com.yahoo.pulsar.testclient.LoadSimulationController "$@"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also update the pulsar_help() above to add the new commands in the help message

elif [ "$COMMAND" == "help" ]; then
pulsar_help;
else
Expand Down
30 changes: 30 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -293,3 +293,33 @@ keepAliveIntervalSeconds=30

# How often broker checks for inactive topics to be deleted (topics with no subscriptions and no one connected)
brokerServicePurgeInactiveFrequencyInSeconds=60

# Number of samples to use for short term time window
loadManagerNumberOfSamplesShortTermWindow=50

# Number of samples to use for long term time window
loadManagerNumberOfSamplesLongTermWindow=1000

# How often in seconds to update the broker data
loadManagerBrokerDataUpdateIntervalInSeconds=60

# How often in seconds to update the bundle data
loadManagerBundleDataUpdateIntervalInSeconds=60

# Default throughput in to assume for new bundles
loadManagerDefaultMessageThroughputIn=50000

# Default throughput out to assume for new bundles
loadManagerDefaultMessageThroughputOut=50000

# Default message rate in to assume for new bundles
loadManagerDefaultMessageRateIn=50

# Default message rate out to assume for new bundles
loadManagerDefaultMessageRateOut=50

# Name of load manager to use
loadManagerClassName=com.yahoo.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl

# Name of placement strategy to use for new load manager API
loadManagerPlacementStrategyClassName=com.yahoo.pulsar.broker.loadbalance.impl.LeastLongTermMessageRate
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
* Pulsar service configuration object.
*
*/
public class ServiceConfiguration implements PulsarConfiguration{
public class ServiceConfiguration implements PulsarConfiguration {

/***** --- pulsar configuration --- ****/
// Zookeeper quorum connection string
Expand Down Expand Up @@ -196,6 +196,10 @@ public class ServiceConfiguration implements PulsarConfiguration{
private boolean loadBalancerEnabled = false;
// load placement strategy
private String loadBalancerPlacementStrategy = "weightedRandomSelection"; // weighted random selection
// load placement secondary strategy (used to silently test an alternate strategy)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you missed to remove this line

private String loadBalancerSecondaryStrategy = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need loadBalancerSecondaryStrategy? can't we update loadBalancerPlacementStrategy and dynamically keep changing the strategy?

// are all bundle placement operations forwarded to a lead broker
private boolean loadBalancerIsCentralized = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need this field. right now, we derive loadBalancerIsCentralized based on strategy. if strategy is leastLoadedServer then loadBalancerIsCentralized will be true else false. So, each LoadBalancer implements isCentralized method.
So, as we already have loadBalancer implementation then LoadBalancerIml.isCentralized() should have predefine value and it shouldn't be dynamic. right?

// Percentage of change to trigger load report update
private int loadBalancerReportUpdateThresholdPercentage = 10;
// maximum interval to update load report
Expand Down Expand Up @@ -724,6 +728,22 @@ public String getLoadBalancerPlacementStrategy() {
return this.loadBalancerPlacementStrategy;
}

public void setLoadBalancerSecondaryStrategy(String secondaryStrategy) {
this.loadBalancerSecondaryStrategy = secondaryStrategy;
}

public String getLoadBalancerSecondaryStrategy() {
return this.loadBalancerSecondaryStrategy;
}

public void setLoadBalancerIsCentralized(boolean isCentralized) {
this.loadBalancerIsCentralized = isCentralized;
}

public boolean getLoadBalancerIsCentralized() {
return this.loadBalancerIsCentralized;
}

public int getLoadBalancerReportUpdateThresholdPercentage() {
return loadBalancerReportUpdateThresholdPercentage;
}
Expand Down Expand Up @@ -941,4 +961,118 @@ public String getReplicatorPrefix() {
public void setReplicatorPrefix(String replicatorPrefix) {
this.replicatorPrefix = replicatorPrefix;
}


// Configurations for new load manager API

// Number of samples to use for short term time window
private int loadManagerNumberOfSamplesShortTermWindow = 50;

// Number of samples to use for long term time window
private int loadManagerNumberOfSamplesLongTermWindow = 1000;

// How often in seconds to update the broker data
private long loadManagerBrokerDataUpdateIntervalInSeconds = 60;

// How often in seconds to update the bundle data
private long loadManagerBundleDataUpdateIntervalInSeconds = 60;

// Default throughput in to assume for new bundles
private double loadManagerDefaultMessageThroughputIn = 50000;

// Default throughput out to assume for new bundles
private double loadManagerDefaultMessageThroughputOut = 50000;

// Default message rate in to assume for new bundles
private double loadManagerDefaultMessageRateIn = 50;

// Default message rate out to assume for new bundles
private double loadManagerDefaultMessageRateOut = 50;

// Name of load manager to use
private String loadManagerClassName = "com.yahoo.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use SimpleLoadManagerImpl.class.getName(), so that it gets updated if the class is renamed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a bit of an issue with this, in that pulsar-broker depends on pulsar-broker-common and maven doesn't allow circular dependencies. Without package reorganization I'm not sure it can be done this way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, no problem then

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should place all the variables in the beginning and then getters/setters.


// Name of placement strategy to use for new loadbalancer API.
private String modularPlacementStrategyClassName =
"com.yahoo.pulsar.broker.loadbalance.impl.LeastLongTermMessageRate";

public int getLoadManagerNumberOfSamplesShortTermWindow() {
return loadManagerNumberOfSamplesShortTermWindow;
}

public void setLoadManagerNumberOfSamplesShortTermWindow(int loadManagerNumberOfSamplesShortTermWindow) {
this.loadManagerNumberOfSamplesShortTermWindow = loadManagerNumberOfSamplesShortTermWindow;
}

public int getLoadManagerNumberOfSamplesLongTermWindow() {
return loadManagerNumberOfSamplesLongTermWindow;
}

public void setLoadManagerNumberOfSamplesLongTermWindow(int loadManagerNumberOfSamplesLongTermWindow) {
this.loadManagerNumberOfSamplesLongTermWindow = loadManagerNumberOfSamplesLongTermWindow;
}

public long getLoadManagerBrokerDataUpdateIntervalInSeconds() {
return loadManagerBrokerDataUpdateIntervalInSeconds;
}

public void setLoadManagerBrokerDataUpdateIntervalInSeconds(long loadManagerBrokerDataUpdateIntervalInSeconds) {
this.loadManagerBrokerDataUpdateIntervalInSeconds = loadManagerBrokerDataUpdateIntervalInSeconds;
}

public long getLoadManagerBundleDataUpdateIntervalInSeconds() {
return loadManagerBundleDataUpdateIntervalInSeconds;
}

public void setLoadManagerBundleDataUpdateIntervalInSeconds(long loadManagerBundleDataUpdateIntervalInSeconds) {
this.loadManagerBundleDataUpdateIntervalInSeconds = loadManagerBundleDataUpdateIntervalInSeconds;
}

public double getLoadManagerDefaultMessageThroughputIn() {
return loadManagerDefaultMessageThroughputIn;
}

public void setLoadManagerDefaultMessageThroughputIn(double loadManagerDefaultMessageThroughputIn) {
this.loadManagerDefaultMessageThroughputIn = loadManagerDefaultMessageThroughputIn;
}

public double getLoadManagerDefaultMessageThroughputOut() {
return loadManagerDefaultMessageThroughputOut;
}

public void setLoadManagerDefaultMessageThroughputOut(double loadManagerDefaultMessageThroughputOut) {
this.loadManagerDefaultMessageThroughputOut = loadManagerDefaultMessageThroughputOut;
}

public double getLoadManagerDefaultMessageRateIn() {
return loadManagerDefaultMessageRateIn;
}

public void setLoadManagerDefaultMessageRateIn(double loadManagerDefaultMessageRateIn) {
this.loadManagerDefaultMessageRateIn = loadManagerDefaultMessageRateIn;
}

public double getLoadManagerDefaultMessageRateOut() {
return loadManagerDefaultMessageRateOut;
}

public void setLoadManagerDefaultMessageRateOut(double loadManagerDefaultMessageRateOut) {
this.loadManagerDefaultMessageRateOut = loadManagerDefaultMessageRateOut;
}

public String getLoadManagerClassName() {
return loadManagerClassName;
}

public void setLoadManagerClassName(String loadManagerClassName) {
this.loadManagerClassName = loadManagerClassName;
}

public String getModularPlacementStrategyClassName() {
return modularPlacementStrategyClassName;
}

public void setModularPlacementStrategyClassName(String modularPlacementStrategyClassName) {
this.modularPlacementStrategyClassName = modularPlacementStrategyClassName;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.yahoo.pulsar.broker;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will require to have copy-right License here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure why this diff is not outdated: the copy-right is present if you look at Files Changed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, we can see now.


import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* Data class containing three components comprising all the data available for the leader broker about other brokers: -
* The local broker data which is written to ZooKeeper by each individual broker (LocalBrokerData). - The time average
* bundle data which is written to ZooKeeper by the leader broker (TimeAverageBrokerData). - The preallocated bundles
* which are not written to ZooKeeper but are maintained by the leader broker (Map<String, BundleData>).
*/
public class BrokerData {
private LocalBrokerData localData;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think LocalBrokerData is a replacement of LoadReport so, can't we add more information into existing LoadReport and use it? In that way we don't have to maintain two classes and any load-manager can deserialize it. is that correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While that is true, I think pushing the new fields into LoadReport would add unnecessary clutter to the JSON for users of SimpleLoadManagerImpl and vice-versa for ModularLoadManagerImpl. I think they are different enough to keep separate.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

private TimeAverageBrokerData timeAverageData;
private Map<String, BundleData> preallocatedBundleData;

/**
* Initialize this BrokerData using the most recent local data.
*
* @param localData
* The data local for the broker.
*/
public BrokerData(final LocalBrokerData localData) {
this.localData = localData;
timeAverageData = new TimeAverageBrokerData();
preallocatedBundleData = new ConcurrentHashMap<>();
}

public LocalBrokerData getLocalData() {
return localData;
}

public void setLocalData(LocalBrokerData localData) {
this.localData = localData;
}

public TimeAverageBrokerData getTimeAverageData() {
return timeAverageData;
}

public void setTimeAverageData(TimeAverageBrokerData timeAverageData) {
this.timeAverageData = timeAverageData;
}

public Map<String, BundleData> getPreallocatedBundleData() {
return preallocatedBundleData;
}

public void setPreallocatedBundleData(Map<String, BundleData> preallocatedBundleData) {
this.preallocatedBundleData = preallocatedBundleData;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package com.yahoo.pulsar.broker;

import com.yahoo.pulsar.common.policies.data.loadbalancer.NamespaceBundleStats;

/**
* Data class comprising the short term and long term historical data for this bundle.
*/
public class BundleData extends JSONWritable {
// Short term data for this bundle. The time frame of this data is
// determined by the number of short term samples
// and the bundle update period.
private TimeAverageMessageData shortTermData;

// Long term data for this bundle. The time frame of this data is determined
// by the number of long term samples
// and the bundle update period.
private TimeAverageMessageData longTermData;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since BundleData is only maintained in memory by the leader, we will loose that info when leader restarts. Isn't it useful to have bundle specific historic data preserved across restart?

Copy link
Contributor Author

@bobbeyreese bobbeyreese Mar 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is maintained on ZooKeeper also in /loadbalance/broker-data (it is basically the replacement for resource quotas). It is updated to ZooKeeper via writeBundleDataOnZooKeeper.


// For JSON only.
public BundleData() {
}

/**
* Initialize the bundle data.
*
* @param numShortSamples
* Number of short term samples to use.
* @param numLongSamples
* Number of long term samples to use.
*/
public BundleData(final int numShortSamples, final int numLongSamples) {
shortTermData = new TimeAverageMessageData(numShortSamples);
longTermData = new TimeAverageMessageData(numLongSamples);
}

/**
* Initialize this bundle data and have its histories default to the given stats before the first sample is
* received.
*
* @param numShortSamples
* Number of short term samples to use.
* @param numLongSamples
* Number of long term samples to use.
* @param defaultStats
* The stats to default to before the first sample is received.
*/
public BundleData(final int numShortSamples, final int numLongSamples, final NamespaceBundleStats defaultStats) {
shortTermData = new TimeAverageMessageData(numShortSamples, defaultStats);
longTermData = new TimeAverageMessageData(numLongSamples, defaultStats);
}

/**
* Update the historical data for this bundle.
*
* @param newSample
* The bundle stats to update this data with.
*/
public void update(final NamespaceBundleStats newSample) {
shortTermData.update(newSample);
longTermData.update(newSample);
}

public TimeAverageMessageData getShortTermData() {
return shortTermData;
}

public void setShortTermData(TimeAverageMessageData shortTermData) {
this.shortTermData = shortTermData;
}

public TimeAverageMessageData getLongTermData() {
return longTermData;
}

public void setLongTermData(TimeAverageMessageData longTermData) {
this.longTermData = longTermData;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.yahoo.pulsar.broker;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.yahoo.pulsar.common.util.ObjectMapperFactory;

/**
* Helper class used to conveniently convert a data class to a JSON.
*/
public class JSONWritable {

/**
* Get the JSON of this object as a byte[].
*
* @return A byte[] of this object's JSON.
* @throws JsonProcessingException
*/
@JsonIgnore
public byte[] getJsonBytes() throws JsonProcessingException {
return ObjectMapperFactory.getThreadLocal().writeValueAsBytes(this);
}

/**
* Get the JSON of this object as a String.
*
* @return A String of this object's JSON.
* @throws JsonProcessingException
*/
@JsonIgnore
public String getJsonString() throws JsonProcessingException {
return ObjectMapperFactory.getThreadLocal().writeValueAsString(this);
}
}
Loading