-
Notifications
You must be signed in to change notification settings - Fork 464
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added consistent hashing strategy #1087
Changes from 7 commits
df48eb9
64607dc
50ceaf6
1ce7876
15d2cab
d8be427
ed23ab3
abca69e
dd94eb4
0e5c854
19ed494
df0794e
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -56,12 +56,15 @@ func (r *OpenTelemetryCollector) Default() { | |
r.Labels["app.kubernetes.io/managed-by"] = "opentelemetry-operator" | ||
} | ||
|
||
// We can default to one because dependent objects Deployment and HorizontalPodAutoScaler | ||
// default to 1 as well. | ||
one := int32(1) | ||
if r.Spec.Replicas == nil { | ||
// We can default to one because dependent objects Deployment and HorizontalPodAutoScaler | ||
// default to 1 as well. | ||
one := int32(1) | ||
r.Spec.Replicas = &one | ||
} | ||
if r.Spec.TargetAllocator.Enabled && r.Spec.TargetAllocator.Replicas == nil { | ||
r.Spec.TargetAllocator.Replicas = &one | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it safe to share the pointer here? If these values are ever modified is it always by assigning a new pointer or is it ever possible that the underlying value will be changed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's okay to modify because this is just for the webhook. The object passed to the operator wouldn't have a spec with a shared pointer (this worked in my testing) |
||
} | ||
} | ||
|
||
// +kubebuilder:webhook:verbs=create;update,path=/validate-opentelemetry-io-v1alpha1-opentelemetrycollector,mutating=false,failurePolicy=fail,groups=opentelemetry.io,resources=opentelemetrycollectors,versions=v1alpha1,name=vopentelemetrycollectorcreateupdate.kb.io,sideEffects=none,admissionReviewVersions=v1 | ||
|
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,194 @@ | ||
package allocation | ||
|
||
import ( | ||
"fmt" | ||
"github.com/buraksezer/consistent" | ||
"github.com/cespare/xxhash/v2" | ||
"github.com/go-logr/logr" | ||
"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/diff" | ||
jaronoff97 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"github.com/prometheus/client_golang/prometheus" | ||
"net/url" | ||
"sync" | ||
) | ||
|
||
var _ Allocator = &consistentHashingAllocator{} | ||
|
||
const consistentHashingStrategyName = "consistent-hashing" | ||
|
||
type hasher struct{} | ||
|
||
func (h hasher) Sum64(data []byte) uint64 { | ||
return xxhash.Sum64(data) | ||
} | ||
|
||
type consistentHashingAllocator struct { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A lot of this code looks very similar to the least weighted strategy. It may be worth investigating in the future if we can combine these as I tried to do in #1068 |
||
// m protects consistentHasher, collectors and targetItems for concurrent use. | ||
m sync.RWMutex | ||
|
||
consistentHasher *consistent.Consistent | ||
|
||
// collectors is a map from a Collector's name to a Collector instance | ||
collectors map[string]*Collector | ||
|
||
// targetItems is a map from a target item's hash to the target items allocated state | ||
targetItems map[string]*TargetItem | ||
|
||
log logr.Logger | ||
} | ||
|
||
func newConsistentHashingAllocator(log logr.Logger) Allocator { | ||
config := consistent.Config{ | ||
PartitionCount: 1061, | ||
ReplicationFactor: 5, | ||
Load: 1.1, | ||
Comment on lines
+43
to
+45
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should these configuration options be exposed to the operator? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure... I think i'm going to open a follow up issue to make these configurable as it would require some refactoring of how configuration is passed down to the allocation strategies as right now it's only the string. Is that alright? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, that's fine. I think using the string for configuration might be fine, as long as it's well specified. Something like URL query params style might work:
That would make it a straightforward addition while leaving plenty of flexibility. Don't need to solve now though, can figure it out on the follow up issue. |
||
Hasher: hasher{}, | ||
Comment on lines
+43
to
+46
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. question: should this be configurable |
||
} | ||
consistentHasher := consistent.New(nil, config) | ||
return &consistentHashingAllocator{ | ||
consistentHasher: consistentHasher, | ||
collectors: make(map[string]*Collector), | ||
targetItems: make(map[string]*TargetItem), | ||
log: log, | ||
} | ||
} | ||
|
||
// addTargetToTargetItems assigns a target to the collector based on its hash and adds it to the allocator's targetItems | ||
// This method is called from within SetTargets and SetCollectors, which acquire the needed lock. | ||
// This is only called after the collectors are cleared or when a new target has been found in the tempTargetMap | ||
// INVARIANT: c.collectors must have at least 1 collector set | ||
func (c *consistentHashingAllocator) addTargetToTargetItems(target *TargetItem) { | ||
// Check if this is a reassignment, if so, decrement the previous collector's NumTargets | ||
if previousColName, ok := c.collectors[target.CollectorName]; ok { | ||
previousColName.NumTargets-- | ||
TargetsPerCollector.WithLabelValues(previousColName.String(), consistentHashingStrategyName).Set(float64(c.collectors[previousColName.String()].NumTargets)) | ||
} | ||
colOwner := c.consistentHasher.LocateKey([]byte(target.Hash())) | ||
targetItem := &TargetItem{ | ||
JobName: target.JobName, | ||
Link: LinkJSON{Link: fmt.Sprintf("/jobs/%s/targets", url.QueryEscape(target.JobName))}, | ||
TargetURL: target.TargetURL, | ||
Label: target.Label, | ||
CollectorName: colOwner.String(), | ||
} | ||
c.targetItems[targetItem.Hash()] = targetItem | ||
c.collectors[colOwner.String()].NumTargets++ | ||
jaronoff97 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
TargetsPerCollector.WithLabelValues(colOwner.String(), consistentHashingStrategyName).Set(float64(c.collectors[colOwner.String()].NumTargets)) | ||
} | ||
|
||
// handleTargets receives the new and removed targets and reconciles the current state. | ||
// Any removals are removed from the allocator's targetItems and unassigned from the corresponding collector | ||
// Any net-new additions are assigned to the next available collector | ||
func (c *consistentHashingAllocator) handleTargets(diff diff.Changes[*TargetItem]) { | ||
// Check for removals | ||
for k, target := range c.targetItems { | ||
// if the current target is in the removals list | ||
if _, ok := diff.Removals()[k]; ok { | ||
col := c.collectors[target.CollectorName] | ||
col.NumTargets-- | ||
delete(c.targetItems, k) | ||
TargetsPerCollector.WithLabelValues(target.CollectorName, consistentHashingStrategyName).Set(float64(col.NumTargets)) | ||
} | ||
} | ||
|
||
// Check for additions | ||
for k, target := range diff.Additions() { | ||
// Do nothing if the item is already there | ||
if _, ok := c.targetItems[k]; ok { | ||
continue | ||
} else { | ||
Comment on lines
+97
to
+100
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this necessary? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe this is still necessary. the provision in |
||
// Add target to target pool and assign a collector | ||
c.addTargetToTargetItems(target) | ||
} | ||
} | ||
} | ||
|
||
// handleCollectors receives the new and removed collectors and reconciles the current state. | ||
// Any removals are removed from the allocator's collectors. New collectors are added to the allocator's collector map | ||
// Finally, update all targets' collectors to match the consistent hashing. | ||
func (c *consistentHashingAllocator) handleCollectors(diff diff.Changes[*Collector]) { | ||
// Clear removed collectors | ||
for _, k := range diff.Removals() { | ||
delete(c.collectors, k.Name) | ||
c.consistentHasher.Remove(k.Name) | ||
TargetsPerCollector.WithLabelValues(k.Name, consistentHashingStrategyName).Set(0) | ||
} | ||
// Insert the new collectors | ||
for _, i := range diff.Additions() { | ||
c.collectors[i.Name] = NewCollector(i.Name) | ||
c.consistentHasher.Add(c.collectors[i.Name]) | ||
} | ||
|
||
// Re-Allocate all targets | ||
for _, item := range c.targetItems { | ||
c.addTargetToTargetItems(item) | ||
} | ||
} | ||
|
||
// SetTargets accepts a list of targets that will be used to make | ||
// load balancing decisions. This method should be called when there are | ||
// new targets discovered or existing targets are shutdown. | ||
func (c *consistentHashingAllocator) SetTargets(targets map[string]*TargetItem) { | ||
timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetTargets", consistentHashingStrategyName)) | ||
defer timer.ObserveDuration() | ||
|
||
c.m.Lock() | ||
defer c.m.Unlock() | ||
|
||
if len(c.collectors) == 0 { | ||
c.log.Info("No collector instances present, cannot set targets") | ||
return | ||
} | ||
// Check for target changes | ||
targetsDiff := diff.Maps(c.targetItems, targets) | ||
// If there are any additions or removals | ||
if len(targetsDiff.Additions()) != 0 || len(targetsDiff.Removals()) != 0 { | ||
c.handleTargets(targetsDiff) | ||
} | ||
return | ||
} | ||
|
||
// SetCollectors sets the set of collectors with key=collectorName, value=Collector object. | ||
// This method is called when Collectors are added or removed. | ||
func (c *consistentHashingAllocator) SetCollectors(collectors map[string]*Collector) { | ||
log := c.log.WithValues("component", "opentelemetry-targetallocator") | ||
timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetCollectors", consistentHashingStrategyName)) | ||
defer timer.ObserveDuration() | ||
|
||
CollectorsAllocatable.WithLabelValues(consistentHashingStrategyName).Set(float64(len(collectors))) | ||
if len(collectors) == 0 { | ||
log.Info("No collector instances present") | ||
return | ||
} | ||
|
||
c.m.Lock() | ||
defer c.m.Unlock() | ||
|
||
// Check for collector changes | ||
collectorsDiff := diff.Maps(c.collectors, collectors) | ||
if len(collectorsDiff.Additions()) != 0 || len(collectorsDiff.Removals()) != 0 { | ||
c.handleCollectors(collectorsDiff) | ||
} | ||
return | ||
} | ||
|
||
// TargetItems returns a shallow copy of the targetItems map. | ||
func (c *consistentHashingAllocator) TargetItems() map[string]*TargetItem { | ||
c.m.RLock() | ||
defer c.m.RUnlock() | ||
targetItemsCopy := make(map[string]*TargetItem) | ||
for k, v := range c.targetItems { | ||
targetItemsCopy[k] = v | ||
} | ||
return targetItemsCopy | ||
} | ||
|
||
// Collectors returns a shallow copy of the collectors map. | ||
func (c *consistentHashingAllocator) Collectors() map[string]*Collector { | ||
c.m.RLock() | ||
defer c.m.RUnlock() | ||
collectorsCopy := make(map[string]*Collector) | ||
for k, v := range c.collectors { | ||
collectorsCopy[k] = v | ||
} | ||
return collectorsCopy | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
package allocation | ||
|
||
import ( | ||
"github.com/stretchr/testify/assert" | ||
"testing" | ||
) | ||
|
||
func TestCanSetSingleTarget(t *testing.T) { | ||
cols := makeNCollectors(3, 0, 0) | ||
c := newConsistentHashingAllocator(logger) | ||
c.SetCollectors(cols) | ||
c.SetTargets(makeNNewTargets(1, 3, 0)) | ||
actualTargetItems := c.TargetItems() | ||
assert.Len(t, actualTargetItems, 1) | ||
for _, item := range actualTargetItems { | ||
assert.Equal(t, "collector-2", item.CollectorName) | ||
} | ||
} | ||
|
||
func TestRelativelyEvenDistribution(t *testing.T) { | ||
numCols := 15 | ||
numItems := 10000 | ||
cols := makeNCollectors(numCols, 0, 0) | ||
var expectedPerCollector = float64(numItems / numCols) | ||
expectedDelta := (expectedPerCollector * 1.5) - expectedPerCollector | ||
c := newConsistentHashingAllocator(logger) | ||
c.SetCollectors(cols) | ||
c.SetTargets(makeNNewTargets(numItems, 0, 0)) | ||
actualTargetItems := c.TargetItems() | ||
assert.Len(t, actualTargetItems, numItems) | ||
actualCollectors := c.Collectors() | ||
assert.Len(t, actualCollectors, numCols) | ||
for _, col := range actualCollectors { | ||
t.Logf("col: %s \ttargets: %d", col.Name, col.NumTargets) | ||
assert.InDelta(t, col.NumTargets, expectedPerCollector, expectedDelta) | ||
} | ||
} | ||
|
||
func TestFullReallocation(t *testing.T) { | ||
cols := makeNCollectors(10, 0, 0) | ||
c := newConsistentHashingAllocator(logger) | ||
c.SetCollectors(cols) | ||
c.SetTargets(makeNNewTargets(10000, 10, 0)) | ||
actualTargetItems := c.TargetItems() | ||
assert.Len(t, actualTargetItems, 10000) | ||
actualCollectors := c.Collectors() | ||
assert.Len(t, actualCollectors, 10) | ||
newCols := makeNCollectors(10, 0, 10) | ||
c.SetCollectors(newCols) | ||
updatedTargetItems := c.TargetItems() | ||
assert.Len(t, updatedTargetItems, 10000) | ||
updatedCollectors := c.Collectors() | ||
assert.Len(t, updatedCollectors, 10) | ||
for _, item := range updatedTargetItems { | ||
_, ok := updatedCollectors[item.CollectorName] | ||
assert.True(t, ok, "Some items weren't reallocated correctly") | ||
} | ||
} | ||
|
||
func TestNumRemapped(t *testing.T) { | ||
numItems := 10_000 | ||
numInitialCols := 15 | ||
numFinalCols := 16 | ||
expectedDelta := float64((numFinalCols - numInitialCols) * (numItems / numFinalCols)) | ||
cols := makeNCollectors(numInitialCols, 0, 0) | ||
c := newConsistentHashingAllocator(logger) | ||
c.SetCollectors(cols) | ||
c.SetTargets(makeNNewTargets(numItems, numInitialCols, 0)) | ||
actualTargetItems := c.TargetItems() | ||
assert.Len(t, actualTargetItems, numItems) | ||
actualCollectors := c.Collectors() | ||
assert.Len(t, actualCollectors, numInitialCols) | ||
newCols := makeNCollectors(numFinalCols, 0, 0) | ||
c.SetCollectors(newCols) | ||
updatedTargetItems := c.TargetItems() | ||
assert.Len(t, updatedTargetItems, numItems) | ||
updatedCollectors := c.Collectors() | ||
assert.Len(t, updatedCollectors, numFinalCols) | ||
countRemapped := 0 | ||
countNotRemapped := 0 | ||
for _, item := range updatedTargetItems { | ||
previousItem, ok := actualTargetItems[item.Hash()] | ||
assert.True(t, ok) | ||
if previousItem.CollectorName != item.CollectorName { | ||
countRemapped++ | ||
} else { | ||
countNotRemapped++ | ||
} | ||
} | ||
assert.InDelta(t, numItems/numFinalCols, countRemapped, expectedDelta) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there an enforcement mechanism, or is this only advisory?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which are the strategies that allow HA?
AllocationStrategy
is a string how does the user know how to configure it?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only advisory, the logic for enforcing may be simple now, but will be more complicated in the future when we have things like leader election or maybe even another method like distributed state. I also was thinking if someone wanted to plug in their own TA that the operator would only know the image of, we wouldn't have a way of changing the validated webhook to know to accept the custom TA strategy.