Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Applying memory_limiter extension #9591

Open
dmitryax opened this issue Feb 15, 2024 · 18 comments
Open

Applying memory_limiter extension #9591

dmitryax opened this issue Feb 15, 2024 · 18 comments

Comments

@dmitryax
Copy link
Member

dmitryax commented Feb 15, 2024

Problem

As part of #8632, we want receivers to be able to reject incoming requests if the collector is running out of memory limits instead of accepting the data transforming to OTLP and get it rejected by the memory_limiter processor. Scraping receivers would skip the scraping in that situation.

We introduced a memory_limiter extension in #8964, but it has not been applied yet.

#9397 proposes a way to configure a memory_limiter extension explicitly by receivers using httpconfig. Applying only this approach to other receivers would complicate the user configuration interface as they would have to explicitly connect every receiver with a memory_limiter extension.

Proposed solution

We can introduce an option to configure the memory_limiter extension in a way that it's applied to every receiver. Particular receivers can override the default memory limiter and connect to a different one.

Configuration example:

extensions:
  memory_limiter:
    limit_mib: 800
    apply_to_all_receivers: true
  memory_limiter/internal_metrics:
    limit_mib: 1000

receivers:
  otlp:
    protocols:
      grpc:
        endpoint: localhost:4317
      http:
        endpoint: localhost:4318
  prometheus:
    memory_limiter: memory_limiter/internal_metrics
    config:
      scrape_configs:
        - job_name: 'otel-collector'
          scrape_interval: 10s
          static_configs:
            - targets: ["localhost:8888"]

processors:
  batch:

exporters:
  debug:
    verbosity: detailed

service:
  extensions: [memory_limiter, memory_limiter/internal_metrics]
  pipelines:
    traces:
      receivers: [otlp]
      processors: [batch]
      exporters: [debug]
    metrics:
      receivers: [otlp, prometheus]
      processors: [batch]
      exporters: [debug]

In this configuration, the default memory_limiter extension is implicitly applied to otlp receiver, and the additional memory_limiter/internal_metrics extension is applied explicitly to the prometheus receiver.

The user cannot define more than one memory_limiter extension with apply_to_all_receivers: true; the config validation will fail.

If memory_limiter extension with apply_to_all_receivers: true is defined, all the receivers used in the pipelines must support the memory_limiter extension. Otherwise, the collector won't start.

Proof of concept: #9590

@TylerHelmuth
Copy link
Member

@dmitryax should this functionality exist in the receiverhelper, similar to how batching is being added to the exporterhelper?

@dmitryax
Copy link
Member Author

dmitryax commented Feb 15, 2024

@TylerHelmuth, yes, we can introduce a receiver helper. It doesn't exist at this point. It may be a cleaner implementation for the memory_limiter. If we think it can help with other problems in the future, we can add it. Otherwise, it can be also an unnecessary user interface complication. I'll look into that

@dmitryax
Copy link
Member Author

First, I would like us to agree on the user interface and the way how the memory_extension can be enabled for all receivers

@TylerHelmuth
Copy link
Member

TylerHelmuth commented Feb 15, 2024

Instead of needing apply_to_all_receivers: true, could the extension component named memory_limiter be applied to all unless the component specifies a different ID?

Pros:

  • more backwards compatible
  • users wouldnt need to remember to set the boolean to get the feature
  • Less config

Cons:

  • config is less self-describing

@dmitryax
Copy link
Member Author

more backwards compatible

How is it more backward compatible? The extension is no-op at this point

How do you know which one is applied to all receives if I have let's say two memory_limiter extensions defined? Looking at each receiver at filtering out one that is not referenced by any of them? This seems like too much magic.

@jpkrohling
Copy link
Member

I have a pending proposal for supporting interceptors (#7441) but I've failed to dedicate engineering time for a PoC. While I'm not too tied to that specific proposal, I think that this here could be split into two parts: one more generic being part of the Collector first, and one with the memory limiter being one implementation.

@TylerHelmuth
Copy link
Member

TylerHelmuth commented Feb 16, 2024

How is it more backward compatible? The extension is no-op at this point

It is closer to what users do now with the processor, so changing it to an extension would be a little easier, but not significant.

This seems like too much magic.

Ya probably

@dmitryax
Copy link
Member Author

dmitryax commented Feb 16, 2024

one more generic being part of the Collector first

@jpkrohling what do you mean by this?

The middleware proposal makes sense to me, but it's not applicable to all receivers, e.g., scraping receivers. The memory_limiter is supposed to be supported by all of them. It's also important to apply the memory limiter before any other middleware for push-based receivers. So I think it's ok to keep them separate.

@jpkrohling
Copy link
Member

one more generic being part of the Collector first

Sorry, I mean a general mechanism (interceptors/middleware/...), plus an implementation of it. I agree about the scraping one, though.

but it's not applicable to all receivers, e.g., scraping receivers

Stupid question, but isn't a memory limiter being used in a scraping receiver similar in usage to an exporter, in the sense that they both could intercept an outgoing HTTP/gRPC request? Perhaps there could be two interceptor interfaces, one for servers (typical receivers) and one for clients (scraping receivers and typical exporters)?

@dmitryax
Copy link
Member Author

Stupid question, but isn't a memory limiter being used in a scraping receiver similar in usage to an exporter, in the sense that they both could intercept an outgoing HTTP/gRPC request? Perhaps there could be two interceptor interfaces, one for servers (typical receivers) and one for clients (scraping receivers and typical exporters)?

Scraping doesn't always involve making HTTP/gRPC requests to an external endpoint. They can read logs or metrics from local files. They can also use some 3rd-party libraries talking to external API, so it'll be hard to apply interceptors.

I think scraping receivers need to skip scrapes on the application level once the memory limit is reached and avoid even making attempts for any external calls.

@moh-osman3
Copy link
Contributor

Hey @dmitryax @timannguyen thanks for all your work to create this memorylimiterextension and to make it useable by receivers. I just wanted to check in and see if there's anything I can do to help make this memorylimiterextension useable?

Our team is struggling with our service reliability because of memory/latency issues due to large requests being accepted by the receiver (despite using in flight bytes memory limiter we are applying in a custom processor). We want to use the core collector memorylimiterextension or create a custom extension to apply our limiter at the start of our pipeline. We use an otelarrow receiver which sets up a GRPC server and seems that we could make use of configgrpc and interceptors to apply the memorylimiterextension.

  1. Is there anything blocking PR from being merged? [config/confighttp] add memorylimiterextension to confighttp #9397
  2. May I implement something similar for configgrpc so our gRPC servers can leverage memory_limiter extensions?

@dmitryax
Copy link
Member Author

dmitryax commented Mar 7, 2024

Hi @moh-osman3, thank you for the interest. I'll be happy to work on this together.

#9673 and #9397 add a configuration option right in the httpconfig and grpcconfig. It's a good option if users want to enable memory_limiter explicitly. However, I believe it's important to provide a way to apply a memory_limiter extension to all the receivers used in the collector pipelines, as proposed in this issue.

I tried to prototype this approach in #9590, and it seems like it doesn't work with having memory_limiter option right in httpconfig and grpcconfig configs. Likely, every receiver has to have this option added separately in their config. That's why I'm hesitant to merge these PRs to avoid breaking them down the road. If you can find a way to merry httpconfig and grpcconfig config with the approach proposed in this PR, I'll be happy to move it forward.

@moh-osman3
Copy link
Contributor

moh-osman3 commented Mar 14, 2024

I tried to prototype this approach in #9590, and it seems like it doesn't work with having memory_limiter option right in httpconfig and grpcconfig configs.

@dmitryax Hmm I've checked out your branch and ran this code. It seems to me that things are working with configgrpc/confighttp i.e. the interceptor is correctly applying the overriding memorylimiterextension you configure in the otlpreceiver, but the issue is the global memory limiter being added in

service:
  extensions: [memory_limiter, memory_limiter/internal_metrics]

will cause the global memorylimiterextension to start up and apply garbage collection if exceeding the global memorylimiter regardless of what is going on in the interceptors for the otlpreceiver. This will happen if the global memorylimiterextension has a stricter memory limit than the overriding extension.

i.e. memorylimiter.Start() -> memorylimiter.CheckMemLimits() -> ml.doGCAndReadMemStats()

This seems to be an issue with using the memorylimiter processor logic directly. Is garbage collection even necessary in the extension itself? i.e. we just want to monitor the current memory usage and reject/retry based on if the next request will push us over the limit. I think each receiver would then have to check if there is a global memorylimiterextension and apply it. Otherwise I'm not sure if it's possible we can have multiple memorylimiter extensions

I'm working on implementing a custom memorylimiterextension that uses a weighted semaphore of size limit_mib to control admission into the receiver. For this custom memorylimiterextension, MustRefuse() will be based on whether sem.TryAcquire() succeeds based on the size of the request. I think this removes the need for garbage collection in the original memorylimiter processor logic and will allow us to successfully override the global memorylimiterextension.

Any thoughts on this?

@dmitryax
Copy link
Member Author

dmitryax commented Mar 19, 2024

@moh-osman3, thanks for your interest.

I don't see a lot of value in the GC capability of the extension. It was just copied from the processor as is. I think users should be good just with using GOMEMLIMIT instead. I think we can update the extension just to provide the MustRefuse() and remove the GC capability while keeping it in the processor to preserve the existing behavior of a popular component.

That will simplify further implementation of the extension. It can be a good first step. It'd be great if you can kelp with a PR.

Another option is to apply GC only if apply_to_all_receivers: true but I'm not sure if that is important enough.

@moh-osman3
Copy link
Contributor

I think users should be good just with using GOMEMLIMIT instead

@dmitryax Can you expand on what you mean here? Are you saying that GOMEMLIMIT should replace the global memorylimitextension? So it should be set as an overall limit for the entire collector binary, while the memorylimiterextension will be a more local memory limit explicitly configured in the component itself?

Happy to help with a PR to remove the garbage collection in the extension. Related to this - something that seems troubling about the memorylimiter is that it seems possible for limits to exceed the configured limit_mib which was why garbage collection was triggered in the processor. Is it possible to change the function signature of MustRefuse to also size the request and take the request size into consideration when refusing/accepting the requests?

i.e. change from func (ml *memorylimiterextension) MustRefuse() bool to
func (ml *memorylimiterextension) MustRefuse(req any) bool?

@moh-osman3
Copy link
Contributor

moh-osman3 commented Apr 3, 2024

@dmitryax I'm still wondering if we can have multiple modes of memorylimiting.

  1. The existing one based on polling the runtime statistics to see if we are above the configured memory limits.
  2. One based on per-item memory limiting that sizes the request and rejects if the next request puts us over the limit.

Relying on method (1) seems like it might be costly to continuously call runtime.ReadMemStats(), as well as accepting large or frequent requests that OOM/stall your applications if the poll interval is not sufficiently small. The smaller the poll interval implies the more expensive the memorylimiterextension is due to an increase in runtime.ReadMemStats() calls. Method (2) will allow the server to know the size of every request before admitting it into the receiver, and can even block until there is enough available memory to process the request (though the number of waiters should probably be limited). This seems like it will make the server more resilient and allow more flexibility for users to configure a memory_limit_strategy that suits their needs. Users can still set GOMEMLIMIT to limit their collectors globally in both strategis.

I need some help here to figure out an interface that would allow for the use of multiple strategies. It seems like the current interface of

type memorylimiterextension interface {
    Start(ctx context.Context, host component.Host) error
    Shutdown(ctx context.Context) error
    MustRefuse() bool
}

won't work for strategy (2) because we need to supply more information from the interceptor to account for request size in our decision to accept/refuse the request, and we need to store a running count of memory usage (and decrement this count when the handler returns).

Maybe we can embed a more general interface for grpc servers to supply their own interceptors based on the memorylimit strategy.

type grpcServerInterceptor interface {
    UnaryInterceptorGenerator()
    StreamInterceptorGenerator()
}

This would simplify things in configgrpc

if gss.MemoryLimiter != nil {
		memoryLimiter, err := getMemoryLimiterExtension(gss.MemoryLimiter, host.GetExtensions())
		if err != nil {
			return nil, err
		}

		uInterceptors = append(uInterceptors, memoryLimiter.UnaryInterceptorGenerator())
		sInterceptors = append(sInterceptors, memoryLimiter.StreamInterceptorGenerator()) 
}

My goal here is not to overcomplicate things but rather provide more flexibility to solving memory related incidents we've experienced in production. I would love any suggestions about how we can extend this memorylimiterextension to support strategy (2) in the future, or if this would need to be a separate component altogether.

@jmacd
Copy link
Contributor

jmacd commented Apr 5, 2024

Wanting to keep this thread alive-- I've caught up with all the discussion and feel we are heading in the right direction.

I definitely support @moh-osman3's point that the memory limiter should have access to something transport-specific in order to intercept the data as it performs the limiting function. I'm not sure that an (data any) formal parameter is the right way, however. Perhaps we can find a way to bind transport-specific memory limiter functions, to align existing work on HTTP and gRPC interceptors with other transports, protocols, and sources of telemetry data. For example, if we are limiting a component that reads local files, an interceptor is likely to look like io.Reader, so the memory limiter will want to intercept Read([]byte). We will have memory limiter interfaces based on the type of transport, and each memory limiter extension will want to implement all the interfaces it can.

@dmitryax Regarding this apply_to_all_receivers: true suggestion, I wonder if the top-level service configuration ought to contain a global memory_limiter configuration, as the PRs linked above do for configgrpc and confighttp? I mean to suggest that the configuration logic that applies memory limiters would first consult the relevant component-level memory_limiter (which may or may not be embedded in a transport-common config struct, such as configgrpc or confighttp), and if no component-level limiter is configured, then it would consult the top-level service::memory_limiter.

Thanks, all!

@dmitryax
Copy link
Member Author

@moh-osman3 I understand the problem of the existing implementation you're trying to solve. Checking the current memory utilization periodically is not ideal. It's pretty hard to configure spike_limit_mib that always covers the maximum possible memory jump between the measures. I'm up for finding a solution for that. I don't think we need different modes/extensions. We have the same goal. I like @jmacd's suggestion of tracking it closer to the wire. However, I have a couple of concerns about the memory_limiter tracking the memory utilization by itself:

  1. The measures from the input will significantly differ from the actual memory utilization down the pipeline, especially for non-otlp/grpc receivers, where we have to create the pdata and keep it along with the request until it's done.
  2. It won't work with any components that handle data asynchronously, e.g., tailsampling processor, groupbytrace processor, deltatocumulative processor, etc. Most importantly, it won't work with queue on exporter size. Users would have to pick a value for the memory limiter by subtracting all the configured queue sizes. Not very convenient.

I think we can try to find some middle ground here. I'd like to keep the simplicity of the current approach and make it more reliable against the spikes. Even if we go with taking the request size as input for the decision for refusing, we can track it only between the runtime calls which will be still used as the source of truth and "reset" the tracker.

blumamir added a commit to odigos-io/odigos that referenced this issue Dec 4, 2024
…1827)

## Problem

At the moment, if there is pressure in the pipeline for any reason, and
batches are failed to export, they will start building up in the queues
of the collector exporter and grow memory unboundly.

Since we don't set any memory request or limit on the node collectors
ds, they will just go on to consume more and more of the available
memory on the node:

1. Will show a pick in resource consumption on the cluster metrics.
2. Starve other pods on the same node, which now has less spare memory
to grow into.
3. If the issue is not transient, the memory will just keep increasing
over time
4. The amount of data in the retry buffers, will keep the CPU busy
attempting to retry the rejected or unsuccessful batches.

## Levels of Protections

To prevent the above issues, we imply few level of protections, listed
from first line to last resort:

1. setting GOMEMLIMIT to a (now hardcoded constant) `352MiB`. At this
point, go runtime GC should kick in and start reclaiming memory
aggressively.
2. Setting the otel collector soft limit to (now hardcoded constant)
`384MiB`. When the heap allocations reach this amount, the collector
will start dropping batches of data after they are exported from the
`batch` processor, instead of streaming them down the pipeline.
3. Setting the otel collector hard limit to `512MiB`. When the heap
reaches this number, a forced GC is performed.
4. Setting the memory request to `256MiB`. This ensures we have at least
this amount of memory to handle normal traffic and some slack for spikes
without running into OOM. the rest of the memory is consumed from
available memory on the node which by handy for more buffering, but may
also cause OOM if the node has no resources.

## Future Work

- Add configuration options to set these values, preferably as a
spectrum for trace-offs: "resource-stability", "resource-spikecapacity"

- drop the data as it received not after it is batched -
open-telemetry/opentelemetry-collector#11726

- drop data at receiver when it's implemented in collector -
open-telemetry/opentelemetry-collector#9591
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants