Skip to content

Commit

Permalink
Added consistent hashing strategy to target-allocator (#1087)
Browse files Browse the repository at this point in the history
* Added consistent hashing strategy

* testing fix, metrics fix

* Add configurable replicas

* Fix linting

* Fix test and add a new one

* Rename var to rerun tests

* Comments and things

* Grammar

* docs 🤦
  • Loading branch information
jaronoff97 authored Sep 16, 2022
1 parent 50592cc commit 5c3e286
Show file tree
Hide file tree
Showing 16 changed files with 455 additions and 41 deletions.
9 changes: 8 additions & 1 deletion apis/v1alpha1/opentelemetrycollector_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,14 @@ type OpenTelemetryCollectorSpec struct {

// OpenTelemetryTargetAllocator defines the configurations for the Prometheus target allocator.
type OpenTelemetryTargetAllocator struct {
// AllocationStrategy determines which strategy the target allocator should use for allocation
// Replicas is the number of pod instances for the underlying TargetAllocator. This should only be set to a value
// other than 1 if a strategy that allows for high availability is chosen. Currently, the only allocation strategy
// that can be run in a high availability mode is consistent-hashing.
// +optional
Replicas *int32 `json:"replicas,omitempty"`
// AllocationStrategy determines which strategy the target allocator should use for allocation.
// The current options are least-weighted and consistent-hashing. The default option is least-weighted
// +optional
AllocationStrategy string `json:"allocationStrategy,omitempty"`
// ServiceAccount indicates the name of an existing service account to use with this instance.
// +optional
Expand Down
9 changes: 6 additions & 3 deletions apis/v1alpha1/opentelemetrycollector_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,15 @@ func (r *OpenTelemetryCollector) Default() {
r.Labels["app.kubernetes.io/managed-by"] = "opentelemetry-operator"
}

// We can default to one because dependent objects Deployment and HorizontalPodAutoScaler
// default to 1 as well.
one := int32(1)
if r.Spec.Replicas == nil {
// We can default to one because dependent objects Deployment and HorizontalPodAutoScaler
// default to 1 as well.
one := int32(1)
r.Spec.Replicas = &one
}
if r.Spec.TargetAllocator.Enabled && r.Spec.TargetAllocator.Replicas == nil {
r.Spec.TargetAllocator.Replicas = &one
}
}

// +kubebuilder:webhook:verbs=create;update,path=/validate-opentelemetry-io-v1alpha1-opentelemetrycollector,mutating=false,failurePolicy=fail,groups=opentelemetry.io,resources=opentelemetrycollectors,versions=v1alpha1,name=vopentelemetrycollectorcreateupdate.kb.io,sideEffects=none,admissionReviewVersions=v1
Expand Down
7 changes: 6 additions & 1 deletion apis/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 11 additions & 1 deletion bundle/manifests/opentelemetry.io_opentelemetrycollectors.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -809,7 +809,9 @@ spec:
properties:
allocationStrategy:
description: AllocationStrategy determines which strategy the
target allocator should use for allocation
target allocator should use for allocation. The current options
are least-weighted and consistent-hashing. The default option
is least-weighted
type: string
enabled:
description: Enabled indicates whether to use a target allocation
Expand All @@ -831,6 +833,14 @@ spec:
custom resources as targets or not.
type: boolean
type: object
replicas:
description: Replicas is the number of pod instances for the underlying
TargetAllocator. This should only be set to a value other than
1 if a strategy that allows for high availability is chosen.
Currently, the only allocation strategy that can be run in a
high availability mode is consistent-hashing.
format: int32
type: integer
serviceAccount:
description: ServiceAccount indicates the name of an existing
service account to use with this instance.
Expand Down
196 changes: 196 additions & 0 deletions cmd/otel-allocator/allocation/consistent_hashing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
package allocation

import (
"fmt"
"net/url"
"sync"

"github.com/buraksezer/consistent"
"github.com/cespare/xxhash/v2"
"github.com/go-logr/logr"
"github.com/prometheus/client_golang/prometheus"

"github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/diff"
)

var _ Allocator = &consistentHashingAllocator{}

const consistentHashingStrategyName = "consistent-hashing"

type hasher struct{}

func (h hasher) Sum64(data []byte) uint64 {
return xxhash.Sum64(data)
}

type consistentHashingAllocator struct {
// m protects consistentHasher, collectors and targetItems for concurrent use.
m sync.RWMutex

consistentHasher *consistent.Consistent

// collectors is a map from a Collector's name to a Collector instance
collectors map[string]*Collector

// targetItems is a map from a target item's hash to the target items allocated state
targetItems map[string]*TargetItem

log logr.Logger
}

func newConsistentHashingAllocator(log logr.Logger) Allocator {
config := consistent.Config{
PartitionCount: 1061,
ReplicationFactor: 5,
Load: 1.1,
Hasher: hasher{},
}
consistentHasher := consistent.New(nil, config)
return &consistentHashingAllocator{
consistentHasher: consistentHasher,
collectors: make(map[string]*Collector),
targetItems: make(map[string]*TargetItem),
log: log,
}
}

// addTargetToTargetItems assigns a target to the collector based on its hash and adds it to the allocator's targetItems
// This method is called from within SetTargets and SetCollectors, which acquire the needed lock.
// This is only called after the collectors are cleared or when a new target has been found in the tempTargetMap
// INVARIANT: c.collectors must have at least 1 collector set
func (c *consistentHashingAllocator) addTargetToTargetItems(target *TargetItem) {
// Check if this is a reassignment, if so, decrement the previous collector's NumTargets
if previousColName, ok := c.collectors[target.CollectorName]; ok {
previousColName.NumTargets--
TargetsPerCollector.WithLabelValues(previousColName.String(), consistentHashingStrategyName).Set(float64(c.collectors[previousColName.String()].NumTargets))
}
colOwner := c.consistentHasher.LocateKey([]byte(target.Hash()))
targetItem := &TargetItem{
JobName: target.JobName,
Link: LinkJSON{Link: fmt.Sprintf("/jobs/%s/targets", url.QueryEscape(target.JobName))},
TargetURL: target.TargetURL,
Label: target.Label,
CollectorName: colOwner.String(),
}
c.targetItems[targetItem.Hash()] = targetItem
c.collectors[colOwner.String()].NumTargets++
TargetsPerCollector.WithLabelValues(colOwner.String(), consistentHashingStrategyName).Set(float64(c.collectors[colOwner.String()].NumTargets))
}

// handleTargets receives the new and removed targets and reconciles the current state.
// Any removals are removed from the allocator's targetItems and unassigned from the corresponding collector
// Any net-new additions are assigned to the next available collector
func (c *consistentHashingAllocator) handleTargets(diff diff.Changes[*TargetItem]) {
// Check for removals
for k, target := range c.targetItems {
// if the current target is in the removals list
if _, ok := diff.Removals()[k]; ok {
col := c.collectors[target.CollectorName]
col.NumTargets--
delete(c.targetItems, k)
TargetsPerCollector.WithLabelValues(target.CollectorName, consistentHashingStrategyName).Set(float64(col.NumTargets))
}
}

// Check for additions
for k, target := range diff.Additions() {
// Do nothing if the item is already there
if _, ok := c.targetItems[k]; ok {
continue
} else {
// Add target to target pool and assign a collector
c.addTargetToTargetItems(target)
}
}
}

// handleCollectors receives the new and removed collectors and reconciles the current state.
// Any removals are removed from the allocator's collectors. New collectors are added to the allocator's collector map
// Finally, update all targets' collectors to match the consistent hashing.
func (c *consistentHashingAllocator) handleCollectors(diff diff.Changes[*Collector]) {
// Clear removed collectors
for _, k := range diff.Removals() {
delete(c.collectors, k.Name)
c.consistentHasher.Remove(k.Name)
TargetsPerCollector.WithLabelValues(k.Name, consistentHashingStrategyName).Set(0)
}
// Insert the new collectors
for _, i := range diff.Additions() {
c.collectors[i.Name] = NewCollector(i.Name)
c.consistentHasher.Add(c.collectors[i.Name])
}

// Re-Allocate all targets
for _, item := range c.targetItems {
c.addTargetToTargetItems(item)
}
}

// SetTargets accepts a list of targets that will be used to make
// load balancing decisions. This method should be called when there are
// new targets discovered or existing targets are shutdown.
func (c *consistentHashingAllocator) SetTargets(targets map[string]*TargetItem) {
timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetTargets", consistentHashingStrategyName))
defer timer.ObserveDuration()

c.m.Lock()
defer c.m.Unlock()

if len(c.collectors) == 0 {
c.log.Info("No collector instances present, cannot set targets")
return
}
// Check for target changes
targetsDiff := diff.Maps(c.targetItems, targets)
// If there are any additions or removals
if len(targetsDiff.Additions()) != 0 || len(targetsDiff.Removals()) != 0 {
c.handleTargets(targetsDiff)
}
return
}

// SetCollectors sets the set of collectors with key=collectorName, value=Collector object.
// This method is called when Collectors are added or removed.
func (c *consistentHashingAllocator) SetCollectors(collectors map[string]*Collector) {
log := c.log.WithValues("component", "opentelemetry-targetallocator")
timer := prometheus.NewTimer(TimeToAssign.WithLabelValues("SetCollectors", consistentHashingStrategyName))
defer timer.ObserveDuration()

CollectorsAllocatable.WithLabelValues(consistentHashingStrategyName).Set(float64(len(collectors)))
if len(collectors) == 0 {
log.Info("No collector instances present")
return
}

c.m.Lock()
defer c.m.Unlock()

// Check for collector changes
collectorsDiff := diff.Maps(c.collectors, collectors)
if len(collectorsDiff.Additions()) != 0 || len(collectorsDiff.Removals()) != 0 {
c.handleCollectors(collectorsDiff)
}
return
}

// TargetItems returns a shallow copy of the targetItems map.
func (c *consistentHashingAllocator) TargetItems() map[string]*TargetItem {
c.m.RLock()
defer c.m.RUnlock()
targetItemsCopy := make(map[string]*TargetItem)
for k, v := range c.targetItems {
targetItemsCopy[k] = v
}
return targetItemsCopy
}

// Collectors returns a shallow copy of the collectors map.
func (c *consistentHashingAllocator) Collectors() map[string]*Collector {
c.m.RLock()
defer c.m.RUnlock()
collectorsCopy := make(map[string]*Collector)
for k, v := range c.collectors {
collectorsCopy[k] = v
}
return collectorsCopy
}
92 changes: 92 additions & 0 deletions cmd/otel-allocator/allocation/consistent_hashing_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package allocation

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestCanSetSingleTarget(t *testing.T) {
cols := makeNCollectors(3, 0, 0)
c := newConsistentHashingAllocator(logger)
c.SetCollectors(cols)
c.SetTargets(makeNNewTargets(1, 3, 0))
actualTargetItems := c.TargetItems()
assert.Len(t, actualTargetItems, 1)
for _, item := range actualTargetItems {
assert.Equal(t, "collector-2", item.CollectorName)
}
}

func TestRelativelyEvenDistribution(t *testing.T) {
numCols := 15
numItems := 10000
cols := makeNCollectors(numCols, 0, 0)
var expectedPerCollector = float64(numItems / numCols)
expectedDelta := (expectedPerCollector * 1.5) - expectedPerCollector
c := newConsistentHashingAllocator(logger)
c.SetCollectors(cols)
c.SetTargets(makeNNewTargets(numItems, 0, 0))
actualTargetItems := c.TargetItems()
assert.Len(t, actualTargetItems, numItems)
actualCollectors := c.Collectors()
assert.Len(t, actualCollectors, numCols)
for _, col := range actualCollectors {
t.Logf("col: %s \ttargets: %d", col.Name, col.NumTargets)
assert.InDelta(t, col.NumTargets, expectedPerCollector, expectedDelta)
}
}

func TestFullReallocation(t *testing.T) {
cols := makeNCollectors(10, 0, 0)
c := newConsistentHashingAllocator(logger)
c.SetCollectors(cols)
c.SetTargets(makeNNewTargets(10000, 10, 0))
actualTargetItems := c.TargetItems()
assert.Len(t, actualTargetItems, 10000)
actualCollectors := c.Collectors()
assert.Len(t, actualCollectors, 10)
newCols := makeNCollectors(10, 0, 10)
c.SetCollectors(newCols)
updatedTargetItems := c.TargetItems()
assert.Len(t, updatedTargetItems, 10000)
updatedCollectors := c.Collectors()
assert.Len(t, updatedCollectors, 10)
for _, item := range updatedTargetItems {
_, ok := updatedCollectors[item.CollectorName]
assert.True(t, ok, "Some items weren't reallocated correctly")
}
}

func TestNumRemapped(t *testing.T) {
numItems := 10_000
numInitialCols := 15
numFinalCols := 16
expectedDelta := float64((numFinalCols - numInitialCols) * (numItems / numFinalCols))
cols := makeNCollectors(numInitialCols, 0, 0)
c := newConsistentHashingAllocator(logger)
c.SetCollectors(cols)
c.SetTargets(makeNNewTargets(numItems, numInitialCols, 0))
actualTargetItems := c.TargetItems()
assert.Len(t, actualTargetItems, numItems)
actualCollectors := c.Collectors()
assert.Len(t, actualCollectors, numInitialCols)
newCols := makeNCollectors(numFinalCols, 0, 0)
c.SetCollectors(newCols)
updatedTargetItems := c.TargetItems()
assert.Len(t, updatedTargetItems, numItems)
updatedCollectors := c.Collectors()
assert.Len(t, updatedCollectors, numFinalCols)
countRemapped := 0
countNotRemapped := 0
for _, item := range updatedTargetItems {
previousItem, ok := actualTargetItems[item.Hash()]
assert.True(t, ok)
if previousItem.CollectorName != item.CollectorName {
countRemapped++
} else {
countNotRemapped++
}
}
assert.InDelta(t, numItems/numFinalCols, countRemapped, expectedDelta)
}
4 changes: 2 additions & 2 deletions cmd/otel-allocator/allocation/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,15 +159,15 @@ func TestGetAllTargetsByCollectorAndJob(t *testing.T) {
"test-label": "test-value",
"foo": "bar",
},
TargetURL: "test-url",
TargetURL: "test-url",
CollectorName: "test-collector",
},
TargetItem{
JobName: "test-job",
Label: model.LabelSet{
"test-label": "test-value",
},
TargetURL: "test-url",
TargetURL: "test-url",
CollectorName: "test-collector",
},
},
Expand Down
Loading

0 comments on commit 5c3e286

Please sign in to comment.