Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable queueing, retry, timeout for Kafka exporter #1455

Merged
merged 1 commit into from
Aug 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions exporter/kafkaexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,25 @@ The following settings are required:
The following settings can be optionally configured:
- `brokers` (default = localhost:9092): The list of kafka brokers
- `topic` (default = otlp_spans): The name of the kafka topic to export to
- `metadata.full` (default = true): Whether to maintain a full set of metadata.
- `metadata`
- `full` (default = true): Whether to maintain a full set of metadata.
When disabled the client does not make the initial request to broker at the startup.
- `metadata.retry.max` (default = 3): The number of retries to get metadata
- `metadata.retry.backoff` (default = 250ms): How long to wait between metadata retries
- `retry`
- `max` (default = 3): The number of retries to get metadata
- `backoff` (default = 250ms): How long to wait between metadata retries
- `timeout` (default = 5s): Is the timeout for every attempt to send data to the backend.
- `retry_on_failure`
- `enabled` (default = true)
- `initial_interval` (default = 5s): Time to wait after the first failure before retrying; ignored if `enabled` is `false`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the default values correct? Shouldn't we use the same defaults as the driver uses?

https://github.com/Shopify/sarama/blob/master/config.go#L462

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You pointed to the timeout this is the initial backoff delay after a request fail. We can tune the timeout to be 10sec if that is important, I would leave it as is for the moment and maybe collect feedback.

- `max_interval` (default = 30s): Is the upper bound on backoff; ignored if `enabled` is `false`
- `max_elapsed_time` (default = 120s): Is the maximum amount of time spent trying to send a batch; ignored if `enabled` is `false`
- `sending_queue`
- `enabled` (default = false)
- `num_consumers` (default = 10): Number of consumers that dequeue batches; ignored if `enabled` is `false`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it mean that 10 instances of the exporter will be created?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just one instance, but will be 10 goroutines consuming items from the in-memory queue (similar with queue retry processor, but per exporter) and calling into the only one instance of the exporter created.

- `queue_size` (default = 5000): Maximum number of batches kept in memory before dropping data; ignored if `enabled` is `false`;
User should calculate this as `num_seconds * requests_per_second` where:
- `num_seconds` is the number of seconds to buffer in case of a backend outage
- `requests_per_second` is the average number of requests per seconds.

Example configuration:

Expand Down
7 changes: 6 additions & 1 deletion exporter/kafkaexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@ import (
"time"

"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

// Config defines configuration for Kafka exporter.
type Config struct {
configmodels.ExporterSettings `mapstructure:",squash"`
configmodels.ExporterSettings `mapstructure:",squash"`
exporterhelper.TimeoutSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
exporterhelper.QueueSettings `mapstructure:"sending_queue"`
exporterhelper.RetrySettings `mapstructure:"retry_on_failure"`
Comment on lines +27 to +29
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since all (?) of the exporters are going to need all 3 settings perhaps introduce a struct which embeds all 3 and use. Can be done in a future PR.


// The list of kafka brokers (default localhost:9092)
Brokers []string `mapstructure:"brokers"`
// Kafka protocol version
Expand Down
16 changes: 16 additions & 0 deletions exporter/kafkaexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ package kafkaexporter
import (
"path"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/config/configtest"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

func TestLoadConfig(t *testing.T) {
Expand All @@ -42,6 +44,20 @@ func TestLoadConfig(t *testing.T) {
NameVal: typeStr,
TypeVal: typeStr,
},
TimeoutSettings: exporterhelper.TimeoutSettings{
Timeout: 10 * time.Second,
},
RetrySettings: exporterhelper.RetrySettings{
Enabled: true,
InitialInterval: 10 * time.Second,
MaxInterval: 1 * time.Minute,
MaxElapsedTime: 10 * time.Minute,
},
QueueSettings: exporterhelper.QueueSettings{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the default values based on something or just common sense (also for the retry) ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just common sense. there is a documentation on how to configure the sending queue:
https://github.com/open-telemetry/opentelemetry-collector/blob/master/exporter/exporterhelper/queued_retry.go#L43

Enabled: true,
NumConsumers: 2,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be one?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it? it is 2 here because that's what I put in the yaml.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good I haven't noticed that this is a test 😟

QueueSize: 10,
},
Topic: "spans",
Brokers: []string{"foo:123", "bar:456"},
Metadata: Metadata{
Expand Down
19 changes: 15 additions & 4 deletions exporter/kafkaexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,19 @@ func NewFactory() component.ExporterFactory {
}

func createDefaultConfig() configmodels.Exporter {
// TODO: Enable the queued settings by default.
qs := exporterhelper.CreateDefaultQueueSettings()
qs.Enabled = false
return &Config{
ExporterSettings: configmodels.ExporterSettings{
TypeVal: typeStr,
NameVal: typeStr,
},
Brokers: []string{defaultBroker},
Topic: defaultTopic,
TimeoutSettings: exporterhelper.CreateDefaultTimeoutSettings(),
RetrySettings: exporterhelper.CreateDefaultRetrySettings(),
QueueSettings: qs,
Brokers: []string{defaultBroker},
Topic: defaultTopic,
Metadata: Metadata{
Full: defaultMetadataFull,
Retry: MetadataRetry{
Expand All @@ -66,13 +72,18 @@ func createTraceExporter(
params component.ExporterCreateParams,
cfg configmodels.Exporter,
) (component.TraceExporter, error) {
c := cfg.(*Config)
exp, err := newExporter(*c, params)
oCfg := cfg.(*Config)
exp, err := newExporter(*oCfg, params)
if err != nil {
return nil, err
}
return exporterhelper.NewTraceExporter(
cfg,
exp.traceDataPusher,
// Disable exporterhelper Timeout, because we cannot pass a Context to the Producer,
// and will rely on the sarama Producer Timeout logic.
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
exporterhelper.WithRetry(oCfg.RetrySettings),
exporterhelper.WithQueue(oCfg.QueueSettings),
exporterhelper.WithShutdown(exp.Close))
}
2 changes: 2 additions & 0 deletions exporter/kafkaexporter/kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ func newExporter(config Config, params component.ExporterCreateParams) (*kafkaPr
c.Producer.Return.Errors = true
// Wait only the local commit to succeed before responding.
c.Producer.RequiredAcks = sarama.WaitForLocal
// Because sarama does not accept a Context for every message, set the Timeout here.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure what you mean by a Context

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SendRequest does not accept a context.Context as an argument, so I cannot pass Timeout using the standard go way to do this via the context.Context

c.Producer.Timeout = config.Timeout
c.Metadata.Full = config.Metadata.Full
c.Metadata.Retry.Max = config.Metadata.Retry.Max
c.Metadata.Retry.Backoff = config.Metadata.Retry.Backoff
Expand Down
9 changes: 7 additions & 2 deletions exporter/kafkaexporter/kafka_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ func TestTraceDataPusher(t *testing.T) {
producer: producer,
marshaller: &protoMarshaller{},
}
defer p.Close(context.Background())
t.Cleanup(func() {
require.NoError(t, p.Close(context.Background()))
})
droppedSpans, err := p.traceDataPusher(context.Background(), testdata.GenerateTraceDataTwoSpansSameResource())
require.NoError(t, err)
assert.Equal(t, 0, droppedSpans)
Expand All @@ -63,7 +65,9 @@ func TestTraceDataPusher_err(t *testing.T) {
marshaller: &protoMarshaller{},
logger: zap.NewNop(),
}
defer p.Close(context.Background())
t.Cleanup(func() {
require.NoError(t, p.Close(context.Background()))
})
td := testdata.GenerateTraceDataTwoSpansSameResource()
droppedSpans, err := p.traceDataPusher(context.Background(), td)
assert.EqualError(t, err, expErr.Error())
Expand All @@ -78,6 +82,7 @@ func TestTraceDataPusher_marshall_error(t *testing.T) {
}
td := testdata.GenerateTraceDataTwoSpansSameResource()
droppedSpans, err := p.traceDataPusher(context.Background(), td)
require.Error(t, err)
bogdandrutu marked this conversation as resolved.
Show resolved Hide resolved
assert.Contains(t, err.Error(), expErr.Error())
assert.Equal(t, td.SpanCount(), droppedSpans)
}
Expand Down
10 changes: 10 additions & 0 deletions exporter/kafkaexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,16 @@ exporters:
full: false
retry:
max: 15
timeout: 10s
sending_queue:
enabled: true
num_consumers: 2
queue_size: 10
retry_on_failure:
enabled: true
initial_interval: 10s
max_interval: 60s
max_elapsed_time: 10m

processors:
exampleprocessor:
Expand Down