Skip to content

Commit

Permalink
Add collection topic map configuration support (#8)
Browse files Browse the repository at this point in the history
  • Loading branch information
mhmtszr authored Jan 15, 2023
1 parent 94e61c7 commit 00b5698
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 64 deletions.
54 changes: 27 additions & 27 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,33 +94,33 @@ $ go get github.com/Trendyol/go-kafka-connect-couchbase

### Configuration

| Variable | Type | Is Required |
|-------------------------------------|-----------------------------|-------------|
| `hosts` | array | yes |
| `username` | string | yes |
| `password` | string | yes |
| `bucketName` | string | yes |
| `scopeName` | string | no |
| `collectionNames` | array | no |
| `metadataBucket` | string | no |
| `dcp.group.name` | string | yes |
| `dcp.group.membership.type` | string | yes |
| `dcp.group.membership.memberNumber` | integer | no |
| `dcp.group.membership.totalMembers` | integer | no |
| `kafka.topic` | string | yes |
| `kafka.brokers` | array | yes |
| `kafka.readTimeout` | integer | no |
| `kafka.writeTimeout` | integer | no |
| `kafka.producerBatchSize` | integer | yes |
| `kafka.producerBatchTickerDuration` | integer | yes |
| `kafka.requiredAcks` | integer | no |
| `kafka.secureConnection` | boolean (true/false) | no |
| `kafka.rootCAPath` | string | no |
| `kafka.interCAPath` | string | no |
| `kafka.scramUsername` | string | no |
| `kafka.scramPassword` | string | no |
| `logger.level` | string | no |
| `checkpoint.timeout` | integer | no |
| Variable | Type | Is Required |
|-------------------------------------|----------------------|-------------|
| `hosts` | array | yes |
| `username` | string | yes |
| `password` | string | yes |
| `bucketName` | string | yes |
| `scopeName` | string | no |
| `collectionNames` | array | no |
| `metadataBucket` | string | no |
| `dcp.group.name` | string | yes |
| `dcp.group.membership.type` | string | yes |
| `dcp.group.membership.memberNumber` | integer | no |
| `dcp.group.membership.totalMembers` | integer | no |
| `kafka.collectionTopicMapping` | map[string][string] | yes |
| `kafka.brokers` | array | yes |
| `kafka.readTimeout` | integer | no |
| `kafka.writeTimeout` | integer | no |
| `kafka.producerBatchSize` | integer | yes |
| `kafka.producerBatchTickerDuration` | integer | yes |
| `kafka.requiredAcks` | integer | no |
| `kafka.secureConnection` | boolean (true/false) | no |
| `kafka.rootCAPath` | string | no |
| `kafka.interCAPath` | string | no |
| `kafka.scramUsername` | string | no |
| `kafka.scramPassword` | string | no |
| `logger.level` | string | no |
| `checkpoint.timeout` | integer | no |

---

Expand Down
24 changes: 12 additions & 12 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,18 @@ import (
)

type Kafka struct {
Topic string `yaml:"topic"`
InterCAPath string `yaml:"interCAPath"`
ScramUsername string `yaml:"scramUsername"`
ScramPassword string `yaml:"scramPassword"`
RootCAPath string `yaml:"rootCAPath"`
Brokers []string `yaml:"brokers"`
ProducerBatchSize int `yaml:"producerBatchSize"`
ProducerBatchTickerDuration time.Duration `yaml:"producerBatchTickerDuration"`
ReadTimeout time.Duration `yaml:"readTimeout"`
WriteTimeout time.Duration `yaml:"writeTimeout"`
RequiredAcks int `yaml:"requiredAcks"`
SecureConnection bool `yaml:"secureConnection"`
CollectionTopicMapping map[string]string `yaml:"collectionTopicMapping"`
InterCAPath string `yaml:"interCAPath"`
ScramUsername string `yaml:"scramUsername"`
ScramPassword string `yaml:"scramPassword"`
RootCAPath string `yaml:"rootCAPath"`
Brokers []string `yaml:"brokers"`
ProducerBatchSize int `yaml:"producerBatchSize"`
ProducerBatchTickerDuration time.Duration `yaml:"producerBatchTickerDuration"`
ReadTimeout time.Duration `yaml:"readTimeout"`
WriteTimeout time.Duration `yaml:"writeTimeout"`
RequiredAcks int `yaml:"requiredAcks"`
SecureConnection bool `yaml:"secureConnection"`
}

type Config struct {
Expand Down
19 changes: 15 additions & 4 deletions connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,29 @@ func (c *connector) listener(event interface{}, err error) {
var e couchbase.Event
switch event := event.(type) {
case godcpclient.DcpMutation:
e = couchbase.NewMutateEvent(event.Key, event.Value)
e = couchbase.NewMutateEvent(event.Key, event.Value, event.CollectionName)
case godcpclient.DcpExpiration:
e = couchbase.NewExpireEvent(event.Key, nil)
e = couchbase.NewExpireEvent(event.Key, nil, event.CollectionName)
case godcpclient.DcpDeletion:
e = couchbase.NewDeleteEvent(event.Key, nil)
e = couchbase.NewDeleteEvent(event.Key, nil, event.CollectionName)
default:
return
}

if kafkaMessage := c.mapper(e); kafkaMessage != nil {
defer message.MessagePool.Put(kafkaMessage)
c.producer.Produce(kafkaMessage.Value, kafkaMessage.Key, kafkaMessage.Headers)
var collectionName string
if e.CollectionName == nil {
collectionName = "_default"
} else {
collectionName = *e.CollectionName
}
topic := c.config.Kafka.CollectionTopicMapping[collectionName]
if topic == "" {
c.errorLogger.Printf("unexpected collection | %s", collectionName)
return
}
c.producer.Produce(kafkaMessage.Value, kafkaMessage.Key, kafkaMessage.Headers, topic)
}
}

Expand Down
38 changes: 21 additions & 17 deletions couchbase/event.go
Original file line number Diff line number Diff line change
@@ -1,33 +1,37 @@
package couchbase

type Event struct {
Key []byte
Value []byte
IsDeleted bool
IsExpired bool
IsMutated bool
CollectionName *string
Key []byte
Value []byte
IsDeleted bool
IsExpired bool
IsMutated bool
}

func NewDeleteEvent(key []byte, value []byte) Event {
func NewDeleteEvent(key []byte, value []byte, collectionName *string) Event {
return Event{
Key: key,
Value: value,
IsDeleted: true,
Key: key,
Value: value,
IsDeleted: true,
CollectionName: collectionName,
}
}

func NewExpireEvent(key []byte, value []byte) Event {
func NewExpireEvent(key []byte, value []byte, collectionName *string) Event {
return Event{
Key: key,
Value: value,
IsExpired: true,
Key: key,
Value: value,
IsExpired: true,
CollectionName: collectionName,
}
}

func NewMutateEvent(key []byte, value []byte) Event {
func NewMutateEvent(key []byte, value []byte, collectionName *string) Event {
return Event{
Key: key,
Value: value,
IsMutated: true,
Key: key,
Value: value,
IsMutated: true,
CollectionName: collectionName,
}
}
4 changes: 3 additions & 1 deletion example/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ dcp:
memberNumber: 1
totalMembers: 1
kafka:
topic: "topicname"
collectionTopicMapping:
_default: topicname

brokers:
- broker1
- broker2
Expand Down
6 changes: 3 additions & 3 deletions kafka/producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
)

type Producer interface {
Produce(message []byte, key []byte, headers map[string]string)
Produce(message []byte, key []byte, headers map[string]string, topic string)
Close() error
}

Expand All @@ -27,7 +27,6 @@ type producer struct {

func NewProducer(config *config.Kafka, logger logger.Logger, errorLogger logger.Logger) Producer {
writer := &kafka.Writer{
Topic: config.Topic,
Addr: kafka.TCP(config.Brokers...),
Balancer: &kafka.Hash{},
BatchSize: config.ProducerBatchSize,
Expand Down Expand Up @@ -96,11 +95,12 @@ var KafkaMessagePool = sync.Pool{
},
}

func (a *producer) Produce(message []byte, key []byte, headers map[string]string) {
func (a *producer) Produce(message []byte, key []byte, headers map[string]string, topic string) {
msg := KafkaMessagePool.Get().(*kafka.Message)
msg.Key = key
msg.Value = message
msg.Headers = newHeaders(headers)
msg.Topic = topic
a.producerBatch.messageChn <- msg
}

Expand Down

0 comments on commit 00b5698

Please sign in to comment.