Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve] [pip] PIP-364: Introduce a new load balance algorithm AvgShedder #22946

Merged
merged 12 commits into from
Jun 27, 2024

Conversation

thetumbled
Copy link
Member

@thetumbled thetumbled commented Jun 20, 2024

Implementation PR: #22949

Motivation

Current load balance algo has defect.

Modifications

Introduce a new load balance algorithm AvgShedder.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

@github-actions github-actions bot added PIP doc-not-needed Your PR changes do not impact docs labels Jun 20, 2024
@thetumbled
Copy link
Member Author

thetumbled commented Jun 20, 2024

We will present a series of experimental data to validate the conclusions analyzed in the document.

ThresholdShedder + LeastResourceUsageWithWeight

Environment and Configuration

Build a 5-node broker cluster with 30 cookies.

The relevant configurations for load balancing are as follows:

loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder
loadBalancerLoadPlacementStrategy=org.apache.pulsar.broker.loadbalance.impl.LeastResourceUsageWithWeight

Using a combination of ThresholdSheder and LeastResourceUsageWithWeight, the configuration uses default values, but the bundle split and uniform distribution feature are turned off.

loadBalancerDistributeBundlesEvenlyEnabled=false
loadBalancerAutoBundleSplitEnabled=false
loadBalancerAutoUnloadSplitBundlesEnabled=false

over placement problem

Start three pressure testing tasks:
image
image
image

After starting the stress testing task, it took 22 minutes for the cluster to stabilize and triggered bundle unload 8 times.

For the convenience of debugging, some logs have been added. The log when the bundle unload is first triggered is as follows:

2024-06-11T15:33:42,642+0800 [pulsar-load-manager-1-1] INFO  org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder - brokerAvgResourceUsage: {XXX.83:8081=0.146445592841173, XXX.32:8081=0.1780747564543283, XXX.87:8081=0.13442747117624326, XXX.206:8081=0.28951184996156754, XXX.161:8081=0.11923764428233738}, avgUsage: 0.1735394629431299, threshold: 0.1, minThroughputThreshold: 10.0MB

This log line prints the final score (i.e. using historical scoring algorithms), average score, and threshold for all brokers.

2024-06-11T15:33:42,642+0800 [pulsar-load-manager-1-1] INFO  org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder - brokerAvgResourceUsageWithoutHistory: {XXX.83:8081=0.6200548553466797, XXX.32:8081=0.6142524337768555, XXX.87:8081=0.34531837463378906, XXX.206:8081=0.6850704193115235, XXX.161:8081=0.4193758010864258}

This log line prints the intermidiate score of all brokers (i.e. the maximum resource utilization rate of all brokers currently, without using historical scoring algorithms). It can be seen that XXX.83:8081, XXX.32:8081, and XXX.206:8081 are high load brokers, while the other two brokers are low load.

According to the first two rows of logs, it can be seen that due to the low load of each broker before starting the stress testing task, the scores of all brokers at this time are significantly different from the actual load. Only the score of XXX.206:8081 exceeded the threshold: 28.951184996156755% > 17.35394629431299% + 10.0%.

Therefore, executing a bundle unload on it resulted in the following logs:

2024-06-11T15:33:42,642+0800 [pulsar-load-manager-1-1] INFO  org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder - Attempting to shed load on XXX.206:8081, which has max resource usage above avgUsage  and threshold 28.951184996156755% > 17.35394629431299% + 10.0% -- Offloading at least 14.70925448705765 MByte/s of traffic, left throughput 208.25151973726722 MByte/s

Unload a bundle and immediately execute the placement policy LeastResourceUsageWithWeight:

2024-06-11T15:33:42,663+0800 [pulsar-load-manager-1-1] INFO  org.apache.pulsar.broker.loadbalance.impl.LeastResourceUsageWithWeight - brokerAvgResourceUsageWithWeight:{XXX.83:8081=0.08251018381118773, XXX.32:8081=0.11141611766815185, XXX.87:8081=0.0459994751214981, XXX.206:8081=0.23925241661071778, XXX.161:8081=0.06012571454048156}, avgUsage:0.10786078155040742, diffThreshold:0.1, candidates:[XXX.83:8081, XXX.32:8081, XXX.87:8081, XXX.206:8081, XXX.161:8081]

2024-06-11T15:33:42,663+0800 [pulsar-load-manager-1-1] WARN  org.apache.pulsar.broker.loadbalance.impl.LeastResourceUsageWithWeight - Assign randomly as all 5 brokers are overloaded.

Due to any broker's score plus 10 being greater than the average score of 10.7%, the candidate broker list is empty, triggering random allocation among all brokers. This is the issue we described in doc with LeastResourceUsageWithWeight: the list of candidate brokers can easily be empty, leading to random allocation.

2024-06-11T15:33:42,663+0800 [pulsar-load-manager-1-1] INFO  org.apache.pulsar.broker.loadbalance.impl.LeastResourceUsageWithWeight - Selected 5 best brokers: [XXX.83:8081, XXX.32:8081, XXX.87:8081, XXX.206:8081, XXX.161:8081] from candidate brokers: [XXX.83:8081, XXX.32:8081, XXX.87:8081, XXX.206:8081, XXX.161:8081], assign bundle public/default/0x70000000_0x80000000 to broker XXX.83:8081
2024-06-11T15:33:42,663+0800 [pulsar-load-manager-1-1] INFO  org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl - [ThresholdShedder] Unloading bundle: public/default/0x70000000_0x80000000 from broker XXX.206:8081 to dest broker XXX.83:8081

As you can see, the unloaded bundle has been assigned to the high load XXX.83:8081! This is a incorrect load balancing decision.
In fact, the probability of triggering this problem in this experiment is very high and almost inevitable. As shown in the figure below, it has been triggered four times in a row.
image

The reason why the probability of reproduction is so high is that historical scoring algorithms also have to bear some blame. This is because the selection criteria for candidate brokers are: the broker's score should be 10 points lower than the average score, which means that the scores between brokers need to widen a considerable gap. However, due to historical scoring algorithms, the scores of all brokers can only slowly approach their true load from around 20, which results in the scores of different brokers being unable to widen the gap. Therefore, the LeastResourceUsageWithWeight algorithm can only perform random allocation.

over unloading problem

In order to increase the load on a single broker, two brokers were shutdown and an abnormal load balancing was observed.
image
image
image

It can be observed that three rounds of load balancing were mainly performed:

  • In the first round, the bundle was unloaded from the highest load of XXX.206:8081 (yellow line) to XXX.83:8081 (green line). However, the bundle was unloaded four times in the first round, causing the load of XXX.206:8081 to quickly become the lowest load broker and encountering the over unloading problem.
  • In the second round, the bundles were unloaded from the highest load XXX.32:8081 (blue line) to the same high load XXX.83:8081 and low load XXX.206:8081. This time, 11 bundles were unloaded, but also encountered the problem of over unloading and over placement. The bundles were mistakenly allocated to the high load XXX.83:8081, and XXX.32:8081 became the lowest load broker.
  • In the third round, the bundle was unloaded from the highest load of XXX.83:8081 to XXX.32:8081, and the cluster entered a balanced state, taking a total of 30 minutes.

The broker log can provide a deeper understanding of the above process:
image

It can be seen that during the 10 minutes of the first round of bundle unloading, it was consistently judged that XXX.206:8081 was a overloaded broker, and the bundle was continuously unloaded from XXX.206:8081, ultimately leading to the problem of over unloading. This is because the historical scoring algorithm caused the rating of XXX.206:8081 to change slowly, even though it has already unloaded the bundle and the actual load has changed accordingly, the score is still very high, so it has been judged as a overloaded broker and has been unloading the bundle.

In the second round of bundle unloading, the bundle was unloaded from the highest load XXX.32:8081, but encountered an issue of placement and was assigned to the high load broker XXX.83:8081.

2024-06-12T10:24:02,245+0800 [pulsar-load-manager-1-1] INFO  org.apache.pulsar.broker.loadbalance.impl.ThresholdShedder - Attempting to shed load on XXX.32:8081, which has max resource usage above avgUsage  and threshold 78.09468007403726% > 65.79112414711298% + 10.0% -- Offloading at least 14.886936767013715 MByte/s of traffic, left throughput 188.94441491927364 MByte/s

2024-06-12T10:24:02,246+0800 [pulsar-load-manager-1-1] INFO  org.apache.pulsar.broker.loadbalance.impl.LeastResourceUsageWithWeight - brokerAvgResourceUsageWithWeight:{XXX.83:8081=0.6968493632164602, XXX.32:8081=0.6564280053774565, XXX.206:8081=0.5447576150322107}, avgUsage:0.6326783278753757, diffThreshold:0.1, candidates:[XXX.83:8081, XXX.32:8081, XXX.206:8081]
2024-06-12T10:24:02,246+0800 [pulsar-load-manager-1-1] WARN  org.apache.pulsar.broker.loadbalance.impl.LeastResourceUsageWithWeight - Assign randomly as all 3 brokers are overloaded.

2024-06-12T10:24:02,247+0800 [pulsar-load-manager-1-1] INFO  org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl - [ThresholdShedder] Unloading bundle: public/default/0x30000000_0x40000000 from broker XXX.32:8081 to dest broker XXX.83:8081

So far, we have experimentally validated the two core defects of ThresholdSheder+LeastResourceUsageWithWeight:

  • over placement problem
  • over unloading problem
    Both of these issues have become severe due to historical scoring algorithms, and it can also be seen that the load balancing speed of ThresholdSheder+LeastResourceUsageWithWeight is not fast. Due to incorrect load balancing decisions, it often requires repeated load balancing to stabilize.

@thetumbled
Copy link
Member Author

AvgShedder (compare to ThresholdShedder + LeastResourceUsageWithWeight)

Deploy the same testing environment as ThresholdSheder+LeastResourceUsageWithWeight.
The algorithm configuration is as follows:

loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.impl.AvgShedder
loadBalancerLoadPlacementStrategy=org.apache.pulsar.broker.loadbalance.impl.AvgShedder
loadBalancerAvgShedderHitCountHighThreshold = 2
loadBalancerAvgShedderHitCountLowThreshold = 8
loadBalancerAvgShedderLowThreshold = 15
loadBalancerAvgShedderHighThreshold = 40
maxUnloadPercentage = 0.5
minUnloadMessageThroughput = 10 * 1024 * 1024
minUnloadMessage = 10000

Since we have disabled the consideration of memory usage and direct memory usage, only network card usage and CPU usage are left. Due to the good performance of our network card, the performance bottleneck lies in the CPU, and the basis for scoring brokers is CPU usage.
Therefore, we add the panel: Range of CPU usage, to represent the load difference between the highest and lowest load brokers in the cluster.

start pressure testing task:
image
image

As can be seen, after starting the pressure testing task, XXX.83 (green line) carried the most traffic and was also the broker with the highest CPU usage, while XXX.161 (blue line) carried the least traffic and was also the broker with the lowest CPU usage. The score difference between the two was 63-38.5=24.5>15. Therefore, after 8 consecutive checks (waiting for 8 minutes), load balancing was triggered, and XXX.83 and XXX.161 shared the traffic equally.

image
Just trigger this bundle unload and the cluster will enter a stable state, while ThresholdSheder+LeastResourceUsageWithWeight took 22 minutes and executed many bundle unloads.

In addition, we also observed a load jitter, where the CPU load of XXX.32 suddenly surged to 86.5 and then rapidly dropped back. However, its traffic throughput did not change, which may be due to other processes deployed on the machine. However, regardless of the reason, the load balancing algorithm cannot immediately trigger bundle unload, and AvgSheder achieved this, demonstrating its ability to cope with load jitter.
image

@thetumbled
Copy link
Member Author

thetumbled commented Jun 20, 2024

UniformLoadShedder + LeastLongTermMessageRate

Environment and Configuration

A 4-node broker cluster was built, containing 20 cookies, but one machine XXX.34 is heterogeneous and its performance is much better than the other three machines.
The relevant configurations for load balancing are as follows:

loadBalancerLoadSheddingStrategy=org.apache.pulsar.broker.loadbalance.impl.UniformLoadShedder
loadBalancerLoadPlacementStrategy=org.apache.pulsar.broker.loadbalance.impl.LeastLongTermMessageRate

The combination of UniformLoadSheder and LeastLongTermMessageRate is used, and most configurations use default values. Therefore, the maximum message rate is allowed to be 1.5 times the minimum message rate, and the maximum traffic throughput is 4 times the minimum traffic throughput.

The feature of bundle split and uniform distribution of bundles has also been disabled.

loadBalancerDistributeBundlesEvenlyEnabled=false
loadBalancerAutoBundleSplitEnabled=false
loadBalancerAutoUnloadSplitBundlesEnabled=false

Heterogeneous environment

Start two pressure testing tasks:
image
image

To observe the execution of the UniformLoadSheder algorithm, two additional panels are added:

  • The maximum to minimum ratio of in and out throughput
    max(sum(pulsar_throughput_in+pulsar_throughput_out) by (instance))/min(sum(pulsar_throughput_in+pulsar_throughput_out) by (instance))
    image

  • The maximum to minimum ratio of in and out message rates.
    max(sum(pulsar_rate_in+pulsar_rate_out) by (instance))/min(sum(pulsar_rate_in+pulsar_rate_out) by (instance))
    image

As can be seen, after 4 round of load balancing, both ratios decreased from 2.5 to around 1.2.

image
image
image

From the perspective of message rate and traffic throughput, this round of load balancing was very successful. The message rate and traffic throughput among the brokers in the cluster were very close, and they reached a stable state within 5 minutes, much faster than ThresholdSheder+LeastResourceUsageWithWeight.

However, by observing the indicators of resource utilization, we can find that the cluster is actually quite uneven. Due to the much stronger performance of XXX.34, its resource utilization rate is much lower than other brokers! This leads to resource waste. If each broker is evenly distributed with a higher load, other brokers will be overloaded, while XXX.34 will still be at a low load level, which is not what we hope to see. We hope that XXX.34 will bear more load.
image

Load jitter

To simulate a sudden increase or decrease in load, add a topic: persistent://public/default/testTxn. The production flow is the same as other tasks, but the consumption flow stops every 1 minute of operation and waits for 1 minute before continuing to consume.
As follows:
image
image

Observing the monitor, it can be observed that the load balancing algorithm has been unloading bundles all along, as the sudden increase or decrease in consumption traffic causes the maximum to minimum ratio of message rates to instantly exceed the configured threshold of 1.5, triggering bundle unloading.
image
image

@thetumbled
Copy link
Member Author

AvgShedder (Compare to UniformLoadShedder + LeastLongTermMessageRate)

Heterogeneous environment

Similar to the previous tests of UniformLoadSheder+LeastLongTermMessageRate, machine XXX.34 is heterogeneous in terms of environment and stress load, and its performance is much stronger than the other three machines.

image
image

It can be observed that machine XXX.34 has significantly higher traffic throughput and message rate than other machines.

image
image

The maximum to minimum ratio of message rate to traffic throughput even reaches 11, but this is reasonable because observing resource utilization, we will find that the load on machine XXX.34 is still the lowest!
image
It can be seen that the resource utilization rate of XXX.34 is still less than half of that of other machines. Readers may wish to further distribute the load of other machines, such as XXX.83, to XXX.34, in order to further balance resource utilization. However, the current AvgSheder algorithm is not yet able to achieve this level, it is only better than UniformLoadSheder in heterogeneous environments.

Load jitter

Further deploy consumption tasks with cycle jitter:
image
image

As you can see, a single bundle unload has not been triggered! Good stability.

@thetumbled
Copy link
Member Author

PTAL, thanks! There is a lot of content, hope them will be helpful to you.
@Demogorgon314 @heesung-sn @BewareMyPower @codelipenghui @Technoboy- @congbobo184 @poorbarcode @lhotari

@lhotari
Copy link
Member

lhotari commented Jun 20, 2024

Impressive work @thetumbled and team!

@heesung-sn
Copy link
Contributor

I am curious if we can compare this with the latest shedder, TransferShedder.

@thetumbled
Copy link
Member Author

thetumbled commented Jun 21, 2024

I am curious if we can compare this with the latest shedder, TransferShedder.

TransferShedder can only be used in new Load Manager ExtensibleLoadManagerImpl instead of ModularLoadManagerImpl.
I don't research ExtensibleLoadManagerImpl yet, but maybe i will study it later.
And i notice that TransferShedder use historical weight algorithm too, which may introduce the same problem as i analyze in the doc.

Copy link
Member

@Demogorgon314 Demogorgon314 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great work!

pip/pip-364.md Outdated Show resolved Hide resolved
pip/pip-364.md Show resolved Hide resolved
@Demogorgon314 Demogorgon314 merged commit 4ac9bc4 into apache:master Jun 27, 2024
20 checks passed
@Technoboy- Technoboy- added this to the 3.4.0 milestone Jul 3, 2024
category = CATEGORY_LOAD_BALANCER,
doc = "In the UniformLoadShedder strategy, the minimum throughput that triggers unload."
)
private int minUnloadMessageThroughput = 1 * 1024 * 1024;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great if we could make the minUnloadMessageThroughput calculated dynamically. This idea aims to resolve the following point.
When a Pulsar cluster has a huge number of topics/producers/consumers, but with low traffic, it is hard for us to determine the minUnloadMessageThroughput. If we change this value too small, and new traffic comes in, the topics will be transferred between brokers with high frequency.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, AvgShedder can deal with such situation without minUnloadMessageThroughput.
The design of AvgShedder comes with high stability.

@heesung-sn
Copy link
Contributor

And i notice that TransferShedder use historical weight algorithm too, which may introduce the same problem as i analyze in the doc.

I assume you refer to this part.

This is because the historical scoring algorithm caused the rating of XXX.206:8081 to change slowly, even though it has already unloaded the bundle and the actual load has changed accordingly, the score is still very high, so it has been judged as a overloaded broker and has been unloading the bundle.

Actually, TransferShedder is trying to provide correct load data to the leader by

  1. Immediately nullify its load data in the metadata store when a broker is involved in any unloading. Then, the leader will not use the out-dated load data in the next run.
  2. Compute the historical load score on each broker instead of leader
  3. Make the leader wait until the new load scores from the unloaded brokers.

@thetumbled
Copy link
Member Author

thetumbled commented Jul 24, 2024

And i notice that TransferShedder use historical weight algorithm too, which may introduce the same problem as i analyze in the doc.

I assume you refer to this part.

This is because the historical scoring algorithm caused the rating of XXX.206:8081 to change slowly, even though it has already unloaded the bundle and the actual load has changed accordingly, the score is still very high, so it has been judged as a overloaded broker and has been unloading the bundle.

yes.

Actually, TransferShedder is trying to provide correct load data to the leader by

  1. Immediately nullify its load data in the metadata store when a broker is involved in any unloading. Then, the leader will not use the out-dated load data in the next run.
  2. Compute the historical load score on each broker instead of leader
  3. Make the leader wait until the new load scores from the unloaded brokers.

It may relieve the defect of historical scoring algorithm, but i am worried that such kind of improvement will bind the algorithm with the load manager together. Maybe we will implement the AvgShedder in the new load manager, right?

Actually, historical scoring algorithm introduce too much complexity into load balance module, and the benefit it bring is not that significant. When a wrong shedding decision is made, it is hard to debug why it decide to do that. The improvement you design will introduce much more complexity.

I prefer to introduce AvgShedder into new load manager first, WDYT? @Demogorgon314 @heesung-sn

@heesung-sn
Copy link
Contributor

I prefer to introduce AvgShedder into new load manager first, WDYT?

You meant the modular load manager here?

Yes, AvgShedder is improvement in modular load manager, indeed.

@thetumbled
Copy link
Member Author

I prefer to introduce AvgShedder into new load manager first, WDYT?

You meant the modular load manager here?

Yes, AvgShedder is improvement in modular load manager, indeed.

No, I means we can try to introduce AvgShedder into ExtensibleLoadManager.

AvgShedder has been introduced into modular load manager.😂

@heesung-sn
Copy link
Contributor

It would be great if the community starts testing/improving ExtensibleLoadManager.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc-not-needed Your PR changes do not impact docs PIP
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants