From 00b56981121c4c48720c327a5f1a95af4e4c4f1f Mon Sep 17 00:00:00 2001 From: Mehmet Sezer Date: Sun, 15 Jan 2023 12:31:27 +0300 Subject: [PATCH] Add collection topic map configuration support (#8) --- README.md | 54 +++++++++++++++++++------------------- config/config.go | 24 ++++++++--------- connector.go | 19 +++++++++++--- couchbase/event.go | 38 +++++++++++++++------------ example/config.yml | 4 ++- kafka/producer/producer.go | 6 ++--- 6 files changed, 81 insertions(+), 64 deletions(-) diff --git a/README.md b/README.md index 8fcdb5a..9f255c3 100644 --- a/README.md +++ b/README.md @@ -94,33 +94,33 @@ $ go get github.com/Trendyol/go-kafka-connect-couchbase ### Configuration -| Variable | Type | Is Required | -|-------------------------------------|-----------------------------|-------------| -| `hosts` | array | yes | -| `username` | string | yes | -| `password` | string | yes | -| `bucketName` | string | yes | -| `scopeName` | string | no | -| `collectionNames` | array | no | -| `metadataBucket` | string | no | -| `dcp.group.name` | string | yes | -| `dcp.group.membership.type` | string | yes | -| `dcp.group.membership.memberNumber` | integer | no | -| `dcp.group.membership.totalMembers` | integer | no | -| `kafka.topic` | string | yes | -| `kafka.brokers` | array | yes | -| `kafka.readTimeout` | integer | no | -| `kafka.writeTimeout` | integer | no | -| `kafka.producerBatchSize` | integer | yes | -| `kafka.producerBatchTickerDuration` | integer | yes | -| `kafka.requiredAcks` | integer | no | -| `kafka.secureConnection` | boolean (true/false) | no | -| `kafka.rootCAPath` | string | no | -| `kafka.interCAPath` | string | no | -| `kafka.scramUsername` | string | no | -| `kafka.scramPassword` | string | no | -| `logger.level` | string | no | -| `checkpoint.timeout` | integer | no | +| Variable | Type | Is Required | +|-------------------------------------|----------------------|-------------| +| `hosts` | array | yes | +| `username` | string | yes | +| `password` | string | yes | +| `bucketName` | string | yes | +| `scopeName` | string | no | +| `collectionNames` | array | no | +| `metadataBucket` | string | no | +| `dcp.group.name` | string | yes | +| `dcp.group.membership.type` | string | yes | +| `dcp.group.membership.memberNumber` | integer | no | +| `dcp.group.membership.totalMembers` | integer | no | +| `kafka.collectionTopicMapping` | map[string][string] | yes | +| `kafka.brokers` | array | yes | +| `kafka.readTimeout` | integer | no | +| `kafka.writeTimeout` | integer | no | +| `kafka.producerBatchSize` | integer | yes | +| `kafka.producerBatchTickerDuration` | integer | yes | +| `kafka.requiredAcks` | integer | no | +| `kafka.secureConnection` | boolean (true/false) | no | +| `kafka.rootCAPath` | string | no | +| `kafka.interCAPath` | string | no | +| `kafka.scramUsername` | string | no | +| `kafka.scramPassword` | string | no | +| `logger.level` | string | no | +| `checkpoint.timeout` | integer | no | --- diff --git a/config/config.go b/config/config.go index f303edf..45e6bf0 100644 --- a/config/config.go +++ b/config/config.go @@ -10,18 +10,18 @@ import ( ) type Kafka struct { - Topic string `yaml:"topic"` - InterCAPath string `yaml:"interCAPath"` - ScramUsername string `yaml:"scramUsername"` - ScramPassword string `yaml:"scramPassword"` - RootCAPath string `yaml:"rootCAPath"` - Brokers []string `yaml:"brokers"` - ProducerBatchSize int `yaml:"producerBatchSize"` - ProducerBatchTickerDuration time.Duration `yaml:"producerBatchTickerDuration"` - ReadTimeout time.Duration `yaml:"readTimeout"` - WriteTimeout time.Duration `yaml:"writeTimeout"` - RequiredAcks int `yaml:"requiredAcks"` - SecureConnection bool `yaml:"secureConnection"` + CollectionTopicMapping map[string]string `yaml:"collectionTopicMapping"` + InterCAPath string `yaml:"interCAPath"` + ScramUsername string `yaml:"scramUsername"` + ScramPassword string `yaml:"scramPassword"` + RootCAPath string `yaml:"rootCAPath"` + Brokers []string `yaml:"brokers"` + ProducerBatchSize int `yaml:"producerBatchSize"` + ProducerBatchTickerDuration time.Duration `yaml:"producerBatchTickerDuration"` + ReadTimeout time.Duration `yaml:"readTimeout"` + WriteTimeout time.Duration `yaml:"writeTimeout"` + RequiredAcks int `yaml:"requiredAcks"` + SecureConnection bool `yaml:"secureConnection"` } type Config struct { diff --git a/connector.go b/connector.go index f35385e..fddfab2 100644 --- a/connector.go +++ b/connector.go @@ -45,18 +45,29 @@ func (c *connector) listener(event interface{}, err error) { var e couchbase.Event switch event := event.(type) { case godcpclient.DcpMutation: - e = couchbase.NewMutateEvent(event.Key, event.Value) + e = couchbase.NewMutateEvent(event.Key, event.Value, event.CollectionName) case godcpclient.DcpExpiration: - e = couchbase.NewExpireEvent(event.Key, nil) + e = couchbase.NewExpireEvent(event.Key, nil, event.CollectionName) case godcpclient.DcpDeletion: - e = couchbase.NewDeleteEvent(event.Key, nil) + e = couchbase.NewDeleteEvent(event.Key, nil, event.CollectionName) default: return } if kafkaMessage := c.mapper(e); kafkaMessage != nil { defer message.MessagePool.Put(kafkaMessage) - c.producer.Produce(kafkaMessage.Value, kafkaMessage.Key, kafkaMessage.Headers) + var collectionName string + if e.CollectionName == nil { + collectionName = "_default" + } else { + collectionName = *e.CollectionName + } + topic := c.config.Kafka.CollectionTopicMapping[collectionName] + if topic == "" { + c.errorLogger.Printf("unexpected collection | %s", collectionName) + return + } + c.producer.Produce(kafkaMessage.Value, kafkaMessage.Key, kafkaMessage.Headers, topic) } } diff --git a/couchbase/event.go b/couchbase/event.go index 79cabca..c3c4f70 100644 --- a/couchbase/event.go +++ b/couchbase/event.go @@ -1,33 +1,37 @@ package couchbase type Event struct { - Key []byte - Value []byte - IsDeleted bool - IsExpired bool - IsMutated bool + CollectionName *string + Key []byte + Value []byte + IsDeleted bool + IsExpired bool + IsMutated bool } -func NewDeleteEvent(key []byte, value []byte) Event { +func NewDeleteEvent(key []byte, value []byte, collectionName *string) Event { return Event{ - Key: key, - Value: value, - IsDeleted: true, + Key: key, + Value: value, + IsDeleted: true, + CollectionName: collectionName, } } -func NewExpireEvent(key []byte, value []byte) Event { +func NewExpireEvent(key []byte, value []byte, collectionName *string) Event { return Event{ - Key: key, - Value: value, - IsExpired: true, + Key: key, + Value: value, + IsExpired: true, + CollectionName: collectionName, } } -func NewMutateEvent(key []byte, value []byte) Event { +func NewMutateEvent(key []byte, value []byte, collectionName *string) Event { return Event{ - Key: key, - Value: value, - IsMutated: true, + Key: key, + Value: value, + IsMutated: true, + CollectionName: collectionName, } } diff --git a/example/config.yml b/example/config.yml index c8e4f7b..9fa43b3 100644 --- a/example/config.yml +++ b/example/config.yml @@ -19,7 +19,9 @@ dcp: memberNumber: 1 totalMembers: 1 kafka: - topic: "topicname" + collectionTopicMapping: + _default: topicname + brokers: - broker1 - broker2 diff --git a/kafka/producer/producer.go b/kafka/producer/producer.go index e09343e..3b31778 100644 --- a/kafka/producer/producer.go +++ b/kafka/producer/producer.go @@ -17,7 +17,7 @@ import ( ) type Producer interface { - Produce(message []byte, key []byte, headers map[string]string) + Produce(message []byte, key []byte, headers map[string]string, topic string) Close() error } @@ -27,7 +27,6 @@ type producer struct { func NewProducer(config *config.Kafka, logger logger.Logger, errorLogger logger.Logger) Producer { writer := &kafka.Writer{ - Topic: config.Topic, Addr: kafka.TCP(config.Brokers...), Balancer: &kafka.Hash{}, BatchSize: config.ProducerBatchSize, @@ -96,11 +95,12 @@ var KafkaMessagePool = sync.Pool{ }, } -func (a *producer) Produce(message []byte, key []byte, headers map[string]string) { +func (a *producer) Produce(message []byte, key []byte, headers map[string]string, topic string) { msg := KafkaMessagePool.Get().(*kafka.Message) msg.Key = key msg.Value = message msg.Headers = newHeaders(headers) + msg.Topic = topic a.producerBatch.messageChn <- msg }