diff --git a/README.md b/README.md index 0a1a9b7..e32af6d 100644 --- a/README.md +++ b/README.md @@ -98,28 +98,28 @@ Check out on [go-dcp](https://github.com/Trendyol/go-dcp#configuration) ### Kafka Specific Configuration -| Variable | Type | Required | Default | Description | -|-------------------------------------|-------------------|----------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| `kafka.collectionTopicMapping` | map[string]string | yes | | Defines which Couchbase collection events will be sent to which topic,:warning: **If topic information is entered in the mapper, it will OVERWRITE this config**. | -| `kafka.brokers` | []string | yes | | Broker ip and port information | -| `kafka.producerBatchSize` | integer | no | 2000 | Maximum message count for batch, if exceed flush will be triggered. | -| `kafka.producerBatchBytes` | 64 bit integer | no | 10485760 | Maximum size(byte) for batch, if exceed flush will be triggered. | -| `kafka.producerBatchTimeout` | time.duration | no | 1 nano second | Time limit on how often incomplete message batches will be flushed. | -| `kafka.producerMaxAttempts` | int | no | math.MaxInt | Limit on how many attempts will be made to deliver a message. | -| `kafka.producerBatchTickerDuration` | time.Duration | no | 10s | Batch is being flushed automatically at specific time intervals for long waiting messages in batch. | -| `kafka.readTimeout` | time.Duration | no | 30s | segmentio/kafka-go - Timeout for read operations | -| `kafka.writeTimeout` | time.Duration | no | 30s | segmentio/kafka-go - Timeout for write operations | -| `kafka.compression` | integer | no | 0 | Compression can be used if message size is large, CPU usage may be affected. 0=None, 1=Gzip, 2=Snappy, 3=Lz4, 4=Zstd | -| `kafka.requiredAcks` | integer | no | 1 | segmentio/kafka-go - Number of acknowledges from partition replicas required before receiving a response to a produce request. 0=fire-and-forget, do not wait for acknowledgements from the, 1=wait for the leader to acknowledge the writes, -1=wait for the full ISR to acknowledge the writes | -| `kafka.secureConnection` | bool | no | false | Enable secure Kafka. | -| `kafka.rootCAPath` | string | no | *not set | Define root CA path. | -| `kafka.interCAPath` | string | no | *not set | Define inter CA path. | -| `kafka.scramUsername` | string | no | *not set | Define scram username. | -| `kafka.scramPassword` | string | no | *not set | Define scram password. | -| `kafka.metadataTTL` | time.Duration | no | 60s | TTL for the metadata cached by segmentio, increase it to reduce network requests. For more detail please check [docs](https://pkg.go.dev/github.com/segmentio/kafka-go#Transport.MetadataTTL). | -| `kafka.metadataTopics` | []string | no | | Topic names for the metadata cached by segmentio, define topics here that the connector may produce. In large Kafka clusters, this will reduce memory usage. For more detail please check [docs](https://pkg.go.dev/github.com/segmentio/kafka-go#Transport.MetadataTopics). | -| `kafka.clientID` | string | no | | Unique identifier that the transport communicates to the brokers when it sends requests. For more detail please check [docs](https://pkg.go.dev/github.com/segmentio/kafka-go#Transport.ClientID). | -| `kafka.allowAutoTopicCreation` | bool | no | false | Create topic if missing. For more detail please check [docs](https://pkg.go.dev/github.com/segmentio/kafka-go#Writer.AllowAutoTopicCreation). | +| Variable | Type | Required | Default | Description | +|-------------------------------------|-------------------|----------|--------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `kafka.collectionTopicMapping` | map[string]string | yes | | Defines which Couchbase collection events will be sent to which topic,:warning: **If topic information is entered in the mapper, it will OVERWRITE this config**. | +| `kafka.brokers` | []string | yes | | Broker ip and port information | +| `kafka.producerBatchSize` | integer | no | 2000 | Maximum message count for batch, if exceed flush will be triggered. | +| `kafka.producerBatchBytes` | 64 bit integer | no | 10mb | Maximum size(byte) for batch, if exceed flush will be triggered. `10mb` is default. | +| `kafka.producerBatchTimeout` | time.duration | no | 1 nano second | Time limit on how often incomplete message batches will be flushed. | +| `kafka.producerMaxAttempts` | int | no | math.MaxInt | Limit on how many attempts will be made to deliver a message. | +| `kafka.producerBatchTickerDuration` | time.Duration | no | 10s | Batch is being flushed automatically at specific time intervals for long waiting messages in batch. | +| `kafka.readTimeout` | time.Duration | no | 30s | segmentio/kafka-go - Timeout for read operations | +| `kafka.writeTimeout` | time.Duration | no | 30s | segmentio/kafka-go - Timeout for write operations | +| `kafka.compression` | integer | no | 0 | Compression can be used if message size is large, CPU usage may be affected. 0=None, 1=Gzip, 2=Snappy, 3=Lz4, 4=Zstd | +| `kafka.requiredAcks` | integer | no | 1 | segmentio/kafka-go - Number of acknowledges from partition replicas required before receiving a response to a produce request. 0=fire-and-forget, do not wait for acknowledgements from the, 1=wait for the leader to acknowledge the writes, -1=wait for the full ISR to acknowledge the writes | +| `kafka.secureConnection` | bool | no | false | Enable secure Kafka. | +| `kafka.rootCAPath` | string | no | *not set | Define root CA path. | +| `kafka.interCAPath` | string | no | *not set | Define inter CA path. | +| `kafka.scramUsername` | string | no | *not set | Define scram username. | +| `kafka.scramPassword` | string | no | *not set | Define scram password. | +| `kafka.metadataTTL` | time.Duration | no | 60s | TTL for the metadata cached by segmentio, increase it to reduce network requests. For more detail please check [docs](https://pkg.go.dev/github.com/segmentio/kafka-go#Transport.MetadataTTL). | +| `kafka.metadataTopics` | []string | no | | Topic names for the metadata cached by segmentio, define topics here that the connector may produce. In large Kafka clusters, this will reduce memory usage. For more detail please check [docs](https://pkg.go.dev/github.com/segmentio/kafka-go#Transport.MetadataTopics). | +| `kafka.clientID` | string | no | | Unique identifier that the transport communicates to the brokers when it sends requests. For more detail please check [docs](https://pkg.go.dev/github.com/segmentio/kafka-go#Transport.ClientID). | +| `kafka.allowAutoTopicCreation` | bool | no | false | Create topic if missing. For more detail please check [docs](https://pkg.go.dev/github.com/segmentio/kafka-go#Writer.AllowAutoTopicCreation). | ### Kafka Metadata Configuration(Use it if you want to store the checkpoint data in Kafka) diff --git a/config/config.go b/config/config.go index 02467f8..0f54232 100644 --- a/config/config.go +++ b/config/config.go @@ -4,10 +4,13 @@ import ( "math" "time" + "github.com/Trendyol/go-dcp/helpers" + "github.com/Trendyol/go-dcp/config" ) type Kafka struct { + ProducerBatchBytes any `yaml:"producerBatchBytes"` CollectionTopicMapping map[string]string `yaml:"collectionTopicMapping"` InterCAPath string `yaml:"interCAPath"` ScramUsername string `yaml:"scramUsername"` @@ -16,9 +19,8 @@ type Kafka struct { ClientID string `yaml:"clientID"` Brokers []string `yaml:"brokers"` MetadataTopics []string `yaml:"metadataTopics"` - ProducerBatchBytes int64 `yaml:"producerBatchBytes"` - ProducerBatchTimeout time.Duration `yaml:"producerBatchTimeout"` ProducerMaxAttempts int `yaml:"producerMaxAttempts"` + ProducerBatchTimeout time.Duration `yaml:"producerBatchTimeout"` ReadTimeout time.Duration `yaml:"readTimeout"` WriteTimeout time.Duration `yaml:"writeTimeout"` RequiredAcks int `yaml:"requiredAcks"` @@ -60,7 +62,7 @@ func (c *Connector) ApplyDefaults() { } if c.Kafka.ProducerBatchBytes == 0 { - c.Kafka.ProducerBatchBytes = 10485760 + c.Kafka.ProducerBatchBytes = helpers.ResolveUnionIntOrStringValue("10mb") } if c.Kafka.RequiredAcks == 0 { diff --git a/kafka/producer/producer.go b/kafka/producer/producer.go index 0a40dac..71e2f57 100644 --- a/kafka/producer/producer.go +++ b/kafka/producer/producer.go @@ -3,6 +3,8 @@ package producer import ( "time" + "github.com/Trendyol/go-dcp/helpers" + "github.com/Trendyol/go-dcp-kafka/config" gKafka "github.com/Trendyol/go-dcp-kafka/kafka" "github.com/Trendyol/go-dcp/models" @@ -29,7 +31,7 @@ func NewProducer(kafkaClient gKafka.Client, config.Kafka.ProducerBatchTickerDuration, writer, config.Kafka.ProducerBatchSize, - config.Kafka.ProducerBatchBytes, + int64(helpers.ResolveUnionIntOrStringValue(config.Kafka.ProducerBatchBytes)), dcpCheckpointCommit, ), }, nil