-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable queueing, retry, timeout for Kafka exporter #1455
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,10 +12,25 @@ The following settings are required: | |
The following settings can be optionally configured: | ||
- `brokers` (default = localhost:9092): The list of kafka brokers | ||
- `topic` (default = otlp_spans): The name of the kafka topic to export to | ||
- `metadata.full` (default = true): Whether to maintain a full set of metadata. | ||
- `metadata` | ||
- `full` (default = true): Whether to maintain a full set of metadata. | ||
When disabled the client does not make the initial request to broker at the startup. | ||
- `metadata.retry.max` (default = 3): The number of retries to get metadata | ||
- `metadata.retry.backoff` (default = 250ms): How long to wait between metadata retries | ||
- `retry` | ||
- `max` (default = 3): The number of retries to get metadata | ||
- `backoff` (default = 250ms): How long to wait between metadata retries | ||
- `timeout` (default = 5s): Is the timeout for every attempt to send data to the backend. | ||
- `retry_on_failure` | ||
- `enabled` (default = true) | ||
- `initial_interval` (default = 5s): Time to wait after the first failure before retrying; ignored if `enabled` is `false` | ||
- `max_interval` (default = 30s): Is the upper bound on backoff; ignored if `enabled` is `false` | ||
- `max_elapsed_time` (default = 120s): Is the maximum amount of time spent trying to send a batch; ignored if `enabled` is `false` | ||
- `sending_queue` | ||
- `enabled` (default = false) | ||
- `num_consumers` (default = 10): Number of consumers that dequeue batches; ignored if `enabled` is `false` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does it mean that 10 instances of the exporter will be created? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just one instance, but will be 10 goroutines consuming items from the in-memory queue (similar with queue retry processor, but per exporter) and calling into the only one instance of the exporter created. |
||
- `queue_size` (default = 5000): Maximum number of batches kept in memory before dropping data; ignored if `enabled` is `false`; | ||
User should calculate this as `num_seconds * requests_per_second` where: | ||
- `num_seconds` is the number of seconds to buffer in case of a backend outage | ||
- `requests_per_second` is the average number of requests per seconds. | ||
|
||
Example configuration: | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,11 +18,16 @@ import ( | |
"time" | ||
|
||
"go.opentelemetry.io/collector/config/configmodels" | ||
"go.opentelemetry.io/collector/exporter/exporterhelper" | ||
) | ||
|
||
// Config defines configuration for Kafka exporter. | ||
type Config struct { | ||
configmodels.ExporterSettings `mapstructure:",squash"` | ||
configmodels.ExporterSettings `mapstructure:",squash"` | ||
exporterhelper.TimeoutSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. | ||
exporterhelper.QueueSettings `mapstructure:"sending_queue"` | ||
exporterhelper.RetrySettings `mapstructure:"retry_on_failure"` | ||
Comment on lines
+27
to
+29
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since all (?) of the exporters are going to need all 3 settings perhaps introduce a struct which embeds all 3 and use. Can be done in a future PR. |
||
|
||
// The list of kafka brokers (default localhost:9092) | ||
Brokers []string `mapstructure:"brokers"` | ||
// Kafka protocol version | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,13 +17,15 @@ package kafkaexporter | |
import ( | ||
"path" | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
|
||
"go.opentelemetry.io/collector/component/componenttest" | ||
"go.opentelemetry.io/collector/config/configmodels" | ||
"go.opentelemetry.io/collector/config/configtest" | ||
"go.opentelemetry.io/collector/exporter/exporterhelper" | ||
) | ||
|
||
func TestLoadConfig(t *testing.T) { | ||
|
@@ -42,6 +44,20 @@ func TestLoadConfig(t *testing.T) { | |
NameVal: typeStr, | ||
TypeVal: typeStr, | ||
}, | ||
TimeoutSettings: exporterhelper.TimeoutSettings{ | ||
Timeout: 10 * time.Second, | ||
}, | ||
RetrySettings: exporterhelper.RetrySettings{ | ||
Enabled: true, | ||
InitialInterval: 10 * time.Second, | ||
MaxInterval: 1 * time.Minute, | ||
MaxElapsedTime: 10 * time.Minute, | ||
}, | ||
QueueSettings: exporterhelper.QueueSettings{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are the default values based on something or just common sense (also for the retry) ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just common sense. there is a documentation on how to configure the sending queue: |
||
Enabled: true, | ||
NumConsumers: 2, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't this be one? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should it? it is 2 here because that's what I put in the yaml. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks good I haven't noticed that this is a test 😟 |
||
QueueSize: 10, | ||
}, | ||
Topic: "spans", | ||
Brokers: []string{"foo:123", "bar:456"}, | ||
Metadata: Metadata{ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,6 +41,8 @@ func newExporter(config Config, params component.ExporterCreateParams) (*kafkaPr | |
c.Producer.Return.Errors = true | ||
// Wait only the local commit to succeed before responding. | ||
c.Producer.RequiredAcks = sarama.WaitForLocal | ||
// Because sarama does not accept a Context for every message, set the Timeout here. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not sure what you mean by a Context There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SendRequest does not accept a |
||
c.Producer.Timeout = config.Timeout | ||
c.Metadata.Full = config.Metadata.Full | ||
c.Metadata.Retry.Max = config.Metadata.Retry.Max | ||
c.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are the default values correct? Shouldn't we use the same defaults as the driver uses?
https://github.com/Shopify/sarama/blob/master/config.go#L462
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You pointed to the timeout this is the initial backoff delay after a request fail. We can tune the timeout to be 10sec if that is important, I would leave it as is for the moment and maybe collect feedback.