Skip to content

Commit

Permalink
feat: add memory limiter to drop data when a soft limit is reached (#…
Browse files Browse the repository at this point in the history
…1827)

## Problem

At the moment, if there is pressure in the pipeline for any reason, and
batches are failed to export, they will start building up in the queues
of the collector exporter and grow memory unboundly.

Since we don't set any memory request or limit on the node collectors
ds, they will just go on to consume more and more of the available
memory on the node:

1. Will show a pick in resource consumption on the cluster metrics.
2. Starve other pods on the same node, which now has less spare memory
to grow into.
3. If the issue is not transient, the memory will just keep increasing
over time
4. The amount of data in the retry buffers, will keep the CPU busy
attempting to retry the rejected or unsuccessful batches.

## Levels of Protections

To prevent the above issues, we imply few level of protections, listed
from first line to last resort:

1. setting GOMEMLIMIT to a (now hardcoded constant) `352MiB`. At this
point, go runtime GC should kick in and start reclaiming memory
aggressively.
2. Setting the otel collector soft limit to (now hardcoded constant)
`384MiB`. When the heap allocations reach this amount, the collector
will start dropping batches of data after they are exported from the
`batch` processor, instead of streaming them down the pipeline.
3. Setting the otel collector hard limit to `512MiB`. When the heap
reaches this number, a forced GC is performed.
4. Setting the memory request to `256MiB`. This ensures we have at least
this amount of memory to handle normal traffic and some slack for spikes
without running into OOM. the rest of the memory is consumed from
available memory on the node which by handy for more buffering, but may
also cause OOM if the node has no resources.

## Future Work

- Add configuration options to set these values, preferably as a
spectrum for trace-offs: "resource-stability", "resource-spikecapacity"

- drop the data as it received not after it is batched -
open-telemetry/opentelemetry-collector#11726

- drop data at receiver when it's implemented in collector -
open-telemetry/opentelemetry-collector#9591
  • Loading branch information
blumamir authored Dec 4, 2024
1 parent 5d612b2 commit 6131199
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 23 deletions.
7 changes: 6 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
TAG ?= $(shell odigos version --cluster)
ODIGOS_CLI_VERSION ?= $(shell odigos version --cli)
ORG := keyval
ORG ?= keyval

.PHONY: build-odiglet
build-odiglet:
Expand Down Expand Up @@ -62,13 +62,18 @@ push-scheduler:
push-collector:
docker buildx build --platform linux/amd64,linux/arm64/v8 --push -t $(ORG)/odigos-collector:$(TAG) collector -f collector/Dockerfile

.PHONY: push-ui
push-ui:
docker buildx build --platform linux/amd64,linux/arm64/v8 --push -t $(ORG)/odigos-ui:$(TAG) . -f frontend/Dockerfile

.PHONY: push-images
push-images:
make push-autoscaler TAG=$(TAG)
make push-scheduler TAG=$(TAG)
make push-odiglet TAG=$(TAG)
make push-instrumentor TAG=$(TAG)
make push-collector TAG=$(TAG)
make push-ui TAG=$(TAG)

.PHONY: load-to-kind-odiglet
load-to-kind-odiglet:
Expand Down
17 changes: 17 additions & 0 deletions autoscaler/controllers/common/memorylimiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package common

import (
odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1"
"github.com/odigos-io/odigos/common/config"
)

func GetMemoryLimiterConfig(memorySettings odigosv1.CollectorsGroupResourcesSettings) config.GenericMap {
// check_interval is currently hardcoded to 1s
// this seems to be a reasonable value for the memory limiter and what the processor uses in docs.
// preforming memory checks is expensive, so we trade off performance with fast reaction time to memory pressure.
return config.GenericMap{
"check_interval": "1s",
"limit_mib": memorySettings.MemoryLimiterLimitMiB,
"spike_limit_mib": memorySettings.MemoryLimiterSpikeLimitMiB,
}
}
36 changes: 23 additions & 13 deletions autoscaler/controllers/datacollection/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ import (

"github.com/ghodss/yaml"
odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1"
"github.com/odigos-io/odigos/autoscaler/controllers/common"
commonconf "github.com/odigos-io/odigos/autoscaler/controllers/common"
"github.com/odigos-io/odigos/autoscaler/controllers/datacollection/custom"
"github.com/odigos-io/odigos/common"
odigoscommon "github.com/odigos-io/odigos/common"
"github.com/odigos-io/odigos/common/config"
"github.com/odigos-io/odigos/common/consts"
constsK8s "github.com/odigos-io/odigos/k8sutils/pkg/consts"
Expand Down Expand Up @@ -124,23 +125,26 @@ func getDesiredConfigMap(apps *odigosv1.InstrumentedApplicationList, dests *odig
return &desired, nil
}

func calculateConfigMapData(collectorsGroup *odigosv1.CollectorsGroup, apps *odigosv1.InstrumentedApplicationList, dests *odigosv1.DestinationList, processors []*odigosv1.Processor,
func calculateConfigMapData(nodeCG *odigosv1.CollectorsGroup, apps *odigosv1.InstrumentedApplicationList, dests *odigosv1.DestinationList, processors []*odigosv1.Processor,
setTracesLoadBalancer bool, disableNameProcessor bool) (string, error) {

ownMetricsPort := collectorsGroup.Spec.CollectorOwnMetricsPort
ownMetricsPort := nodeCG.Spec.CollectorOwnMetricsPort

empty := struct{}{}

processorsCfg, tracesProcessors, metricsProcessors, logsProcessors, errs := config.GetCrdProcessorsConfigMap(commonconf.ToProcessorConfigurerArray(processors))
for name, err := range errs {
log.Log.V(0).Info(err.Error(), "processor", name)
log.Log.V(0).Error(err, "processor", name)
}

if !disableNameProcessor {
processorsCfg["odigosresourcename"] = empty
}

memoryLimiterConfiguration := common.GetMemoryLimiterConfig(nodeCG.Spec.ResourcesSettings)

processorsCfg["batch"] = empty
processorsCfg["memory_limiter"] = memoryLimiterConfiguration
processorsCfg["resource"] = config.GenericMap{
"attributes": []config.GenericMap{{
"key": "k8s.node.name",
Expand Down Expand Up @@ -266,13 +270,13 @@ func calculateConfigMapData(collectorsGroup *odigosv1.CollectorsGroup, apps *odi
collectLogs := false
for _, dst := range dests.Items {
for _, s := range dst.Spec.Signals {
if s == common.LogsObservabilitySignal && !custom.DestRequiresCustom(dst.Spec.Type) {
if s == odigoscommon.LogsObservabilitySignal && !custom.DestRequiresCustom(dst.Spec.Type) {
collectLogs = true
}
if s == common.TracesObservabilitySignal || dst.Spec.Type == common.PrometheusDestinationType {
if s == odigoscommon.TracesObservabilitySignal || dst.Spec.Type == odigoscommon.PrometheusDestinationType {
collectTraces = true
}
if s == common.MetricsObservabilitySignal && !custom.DestRequiresCustom(dst.Spec.Type) {
if s == odigoscommon.MetricsObservabilitySignal && !custom.DestRequiresCustom(dst.Spec.Type) {
collectMetrics = true
}
}
Expand Down Expand Up @@ -364,7 +368,7 @@ func getConfigMap(ctx context.Context, c client.Client, namespace string) (*v1.C
return configMap, nil
}

func getSignalsFromOtelcolConfig(otelcolConfigContent string) ([]common.ObservabilitySignal, error) {
func getSignalsFromOtelcolConfig(otelcolConfigContent string) ([]odigoscommon.ObservabilitySignal, error) {
config := config.Config{}
err := yaml.Unmarshal([]byte(otelcolConfigContent), &config)
if err != nil {
Expand All @@ -389,22 +393,28 @@ func getSignalsFromOtelcolConfig(otelcolConfigContent string) ([]common.Observab
}
}

signals := []common.ObservabilitySignal{}
signals := []odigoscommon.ObservabilitySignal{}
if tracesEnabled {
signals = append(signals, common.TracesObservabilitySignal)
signals = append(signals, odigoscommon.TracesObservabilitySignal)
}
if metricsEnabled {
signals = append(signals, common.MetricsObservabilitySignal)
signals = append(signals, odigoscommon.MetricsObservabilitySignal)
}
if logsEnabled {
signals = append(signals, common.LogsObservabilitySignal)
signals = append(signals, odigoscommon.LogsObservabilitySignal)
}

return signals, nil
}

func getCommonProcessors(disableNameProcessor bool) []string {
processors := []string{"batch"}
// memory limiter is placed right after batch processor an not the first processor in pipeline
// this is so that instrumented application always succeeds in sending data to the collector
// (on it being added to a batch) and checking the memory limit later after the batch
// where memory rejection would drop the data instead of backpressuring the application.
// Read more about it here: https://github.com/open-telemetry/opentelemetry-collector/issues/11726
// Also related: https://github.com/open-telemetry/opentelemetry-collector/issues/9591
processors := []string{"batch", "memory_limiter"}
if !disableNameProcessor {
processors = append(processors, "odigosresourcename")
}
Expand Down
16 changes: 16 additions & 0 deletions autoscaler/controllers/datacollection/daemonset.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -191,6 +192,9 @@ func getDesiredDaemonSet(datacollection *odigosv1.CollectorsGroup, configData st
rollingUpdate.MaxSurge = &maxSurge
}

requestMemoryRequestQuantity := resource.MustParse(fmt.Sprintf("%dMi", datacollection.Spec.ResourcesSettings.MemoryRequestMiB))
requestMemoryLimitQuantity := resource.MustParse(fmt.Sprintf("%dMi", datacollection.Spec.ResourcesSettings.MemoryLimitMiB))

desiredDs := &appsv1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Name: consts.OdigosNodeCollectorDaemonSetName,
Expand Down Expand Up @@ -302,6 +306,10 @@ func getDesiredDaemonSet(datacollection *odigosv1.CollectorsGroup, configData st
},
},
},
{
Name: "GOMEMLIMIT",
Value: fmt.Sprintf("%dMiB", datacollection.Spec.ResourcesSettings.GomemlimitMiB),
},
},
LivenessProbe: &corev1.Probe{
ProbeHandler: corev1.ProbeHandler{
Expand All @@ -319,6 +327,14 @@ func getDesiredDaemonSet(datacollection *odigosv1.CollectorsGroup, configData st
},
},
},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceMemory: requestMemoryRequestQuantity,
},
Limits: corev1.ResourceList{
corev1.ResourceMemory: requestMemoryLimitQuantity,
},
},
SecurityContext: &corev1.SecurityContext{
Privileged: boolPtr(true),
},
Expand Down
7 changes: 1 addition & 6 deletions autoscaler/controllers/gateway/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,12 +113,7 @@ func addSelfTelemetryPipeline(c *config.Config, ownTelemetryPort int32) error {

func syncConfigMap(dests *odigosv1.DestinationList, allProcessors *odigosv1.ProcessorList, gateway *odigosv1.CollectorsGroup, ctx context.Context, c client.Client, scheme *runtime.Scheme) (string, []odigoscommon.ObservabilitySignal, error) {
logger := log.FromContext(ctx)

memoryLimiterConfiguration := config.GenericMap{
"check_interval": "1s",
"limit_mib": gateway.Spec.ResourcesSettings.MemoryLimiterLimitMiB,
"spike_limit_mib": gateway.Spec.ResourcesSettings.MemoryLimiterSpikeLimitMiB,
}
memoryLimiterConfiguration := common.GetMemoryLimiterConfig(gateway.Spec.ResourcesSettings)

processors := common.FilterAndSortProcessorsByOrderHint(allProcessors, odigosv1.CollectorsGroupRoleClusterGateway)

Expand Down
34 changes: 31 additions & 3 deletions scheduler/controllers/nodecollectorsgroup/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,44 @@ import (

odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1"
"github.com/odigos-io/odigos/common"
"github.com/odigos-io/odigos/k8sutils/pkg/consts"
k8sutilsconsts "github.com/odigos-io/odigos/k8sutils/pkg/consts"
"github.com/odigos-io/odigos/k8sutils/pkg/env"
"github.com/odigos-io/odigos/k8sutils/pkg/utils"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func getMemorySettings(odigosConfig common.OdigosConfiguration) odigosv1.CollectorsGroupResourcesSettings {
// TODO: currently using hardcoded values, should be configurable.
//
// memory request is expensive on daemonsets since it will consume this memory
// on each node in the cluster. setting to 256, but allowing memory to spike higher
// to consume more available memory on the node.
// if the node has memory to spare, we can use it to buffer more data before dropping,
// but it also means that if no memory is available, collector might get killed by OOM killer.
//
// we can trade-off the memory request:
// - more memory request: more memory allocated per collector on each node, but more buffer for bursts and transient failures.
// - less memory request: efficient use of cluster resources, but data might be dropped earlier on spikes.
// currently choosing 256MiB as a balance (~200MiB left for heap to handle batches and export queues).
//
// we can trade-off how high the memory limit is set above the request:
// - limit is set to request: collector most stable (no OOM) but smaller buffer for bursts and early data drop.
// - limit is set way above request: in case of memory spike, collector will use extra memory available on the node to buffer data, but might get killed by OOM killer if this memory is not available.
// currently choosing 512MiB as a balance (200MiB guaranteed for heap, and the rest ~300MiB of buffer from node before start dropping).
//
return odigosv1.CollectorsGroupResourcesSettings{
MemoryRequestMiB: 256,
MemoryLimitMiB: 512 + 64,
MemoryLimiterLimitMiB: 512,
MemoryLimiterSpikeLimitMiB: 128, // meaning that collector will start dropping data at 512-128=384MiB
GomemlimitMiB: 512 - 128 - 32, // start aggressive GC 32 MiB before soft limit and dropping data
}
}

func newNodeCollectorGroup(odigosConfig common.OdigosConfiguration) *odigosv1.CollectorsGroup {

ownMetricsPort := consts.OdigosNodeCollectorOwnTelemetryPortDefault
ownMetricsPort := k8sutilsconsts.OdigosNodeCollectorOwnTelemetryPortDefault
if odigosConfig.CollectorNode != nil && odigosConfig.CollectorNode.CollectorOwnMetricsPort != 0 {
ownMetricsPort = odigosConfig.CollectorNode.CollectorOwnMetricsPort
}
Expand All @@ -27,12 +54,13 @@ func newNodeCollectorGroup(odigosConfig common.OdigosConfiguration) *odigosv1.Co
APIVersion: "odigos.io/v1alpha1",
},
ObjectMeta: metav1.ObjectMeta{
Name: consts.OdigosNodeCollectorDaemonSetName,
Name: k8sutilsconsts.OdigosNodeCollectorDaemonSetName,
Namespace: env.GetCurrentNamespace(),
},
Spec: odigosv1.CollectorsGroupSpec{
Role: odigosv1.CollectorsGroupRoleNodeCollector,
CollectorOwnMetricsPort: ownMetricsPort,
ResourcesSettings: getMemorySettings(odigosConfig),
},
}
}
Expand Down
9 changes: 9 additions & 0 deletions tests/common/assert/pipeline-ready.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ spec:
resources:
requests:
(memory != null): true
limits:
(memory != null): true
volumeMounts:
- mountPath: /conf
name: collector-conf
Expand Down Expand Up @@ -150,6 +152,13 @@ spec:
fieldRef:
apiVersion: v1
fieldPath: metadata.name
- name: GOMEMLIMIT
(value != null): true
resources:
requests:
(memory != null): true
limits:
(memory != null): true
hostNetwork: true
nodeSelector:
kubernetes.io/os: linux
Expand Down

0 comments on commit 6131199

Please sign in to comment.