Skip to content

Commit

Permalink
feat: add sinkResponseHandler (#81)
Browse files Browse the repository at this point in the history
  • Loading branch information
enesyalinkaya authored Jan 5, 2024
1 parent 13715bd commit 953445c
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 14 deletions.
22 changes: 15 additions & 7 deletions connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (c *connector) getTopicName(collectionName string, messageTopic string) str
return topic
}

func newConnector(cfg any, mapper Mapper) (Connector, error) {
func newConnector(cfg any, mapper Mapper, sinkResponseHandler kafka.SinkResponseHandler) (Connector, error) {
c, err := newConfig(cfg)
if err != nil {
return nil, err
Expand Down Expand Up @@ -128,7 +128,8 @@ func newConnector(cfg any, mapper Mapper) (Connector, error) {

connector.dcp = dcpClient

connector.producer, err = producer.NewProducer(kafkaClient, c, dcpClient.Commit)
connector.producer, err = producer.NewProducer(kafkaClient, c, dcpClient.Commit, sinkResponseHandler)

if err != nil {
logger.Log.Error("kafka error: %v", err)
return nil, err
Expand Down Expand Up @@ -200,14 +201,16 @@ func newConnectorConfigFromPath(path string) (*config.Connector, error) {
}

type ConnectorBuilder struct {
mapper Mapper
config any
mapper Mapper
config any
sinkResponseHandler kafka.SinkResponseHandler
}

func NewConnectorBuilder(config any) *ConnectorBuilder {
return &ConnectorBuilder{
config: config,
mapper: DefaultMapper,
config: config,
mapper: DefaultMapper,
sinkResponseHandler: nil,
}
}

Expand All @@ -216,8 +219,13 @@ func (c *ConnectorBuilder) SetMapper(mapper Mapper) *ConnectorBuilder {
return c
}

func (c *ConnectorBuilder) SetSinkResponseHandler(sinkResponseHandler kafka.SinkResponseHandler) *ConnectorBuilder {
c.sinkResponseHandler = sinkResponseHandler
return c
}

func (c *ConnectorBuilder) Build() (Connector, error) {
return newConnector(c.config, c.mapper)
return newConnector(c.config, c.mapper, c.sinkResponseHandler)
}

func (c *ConnectorBuilder) SetLogger(l *logrus.Logger) *ConnectorBuilder {
Expand Down
30 changes: 26 additions & 4 deletions example/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,43 @@ func mapper(event couchbase.Event) []message.KafkaMessage {
}
```

## Step 3: Configuring the Connector
## Step 3: Implementing the SinkResponseHandler

This function is called after the event is published and takes `message.KafkaMessage` as a parameter.
Here's an example SinkResponseHandler implementation:

```go
type sinkResponseHandler struct {
}

func (s *sinkResponseHandler) OnSuccess(ctx *kafka.SinkResponseHandlerContext) {
fmt.Printf("OnSuccess %v\n", string(ctx.Message.Value))
}

func (s *sinkResponseHandler) OnError(ctx *kafka.SinkResponseHandlerContext) {
fmt.Printf("OnError %v\n", string(ctx.Message.Value))
}
```

## Step 4: Configuring the Connector

The configuration for the connector is provided via a YAML file. Here's an example [configuration](https://github.com/Trendyol/go-dcp-kafka/blob/master/example/config.yml):

You can find explanation of [configurations](https://github.com/Trendyol/go-dcp#configuration)

You can pass this configuration file to the connector by providing the path to the file when creating the connector:
```go
connector, err := dcpkafka.NewConnector("path-to-config.yml", mapper)
connector, err := dcpkafka.NewConnectorBuilder("config.yml").
SetMapper(mapper).
SetSinkResponseHandler(&sinkResponseHandler{}). // if you want to add callback func
Build()

if err != nil {
panic(err)
panic(err)
}
```

## Step 4: Starting and Closing the Connector
## Step 5: Starting and Closing the Connector

Once you have implemented the mapper and configured the connector, you can start/stop the connector:

Expand Down
20 changes: 18 additions & 2 deletions example/simple/main.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package main

import (
"github.com/Trendyol/go-dcp-kafka"
"fmt"
dcpkafka "github.com/Trendyol/go-dcp-kafka"
"github.com/Trendyol/go-dcp-kafka/couchbase"
"github.com/Trendyol/go-dcp-kafka/kafka"
"github.com/Trendyol/go-dcp-kafka/kafka/message"
)

Expand All @@ -17,8 +19,22 @@ func mapper(event couchbase.Event) []message.KafkaMessage {
}
}

type sinkResponseHandler struct {
}

func (s *sinkResponseHandler) OnSuccess(ctx *kafka.SinkResponseHandlerContext) {
fmt.Printf("OnSuccess %v\n", string(ctx.Message.Value))
}

func (s *sinkResponseHandler) OnError(ctx *kafka.SinkResponseHandlerContext) {
fmt.Printf("OnError %v\n", string(ctx.Message.Value))
}

func main() {
c, err := dcpkafka.NewConnectorBuilder("config.yml").SetMapper(mapper).Build()
c, err := dcpkafka.NewConnectorBuilder("config.yml").
SetMapper(mapper).
SetSinkResponseHandler(&sinkResponseHandler{}).
Build()
if err != nil {
panic(err)
}
Expand Down
2 changes: 2 additions & 0 deletions kafka/producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type Producer struct {
func NewProducer(kafkaClient gKafka.Client,
config *config.Connector,
dcpCheckpointCommit func(),
sinkResponseHandler gKafka.SinkResponseHandler,
) (Producer, error) {
writer := kafkaClient.Producer()

Expand All @@ -33,6 +34,7 @@ func NewProducer(kafkaClient gKafka.Client,
config.Kafka.ProducerBatchSize,
int64(helpers.ResolveUnionIntOrStringValue(config.Kafka.ProducerBatchBytes)),
dcpCheckpointCommit,
sinkResponseHandler,
),
}, nil
}
Expand Down
64 changes: 63 additions & 1 deletion kafka/producer/producer_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@ import (
"syscall"
"time"

gKafka "github.com/Trendyol/go-dcp-kafka/kafka"
"github.com/Trendyol/go-dcp-kafka/kafka/message"
"github.com/Trendyol/go-dcp/logger"

"github.com/Trendyol/go-dcp/models"
"github.com/segmentio/kafka-go"
)

type Batch struct {
sinkResponseHandler gKafka.SinkResponseHandler
batchTicker *time.Ticker
Writer *kafka.Writer
dcpCheckpointCommit func()
Expand All @@ -36,6 +39,7 @@ func newBatch(
batchLimit int,
batchBytes int64,
dcpCheckpointCommit func(),
sinkResponseHandler gKafka.SinkResponseHandler,
) *Batch {
batch := &Batch{
batchTickerDuration: batchTime,
Expand All @@ -46,6 +50,7 @@ func newBatch(
batchLimit: batchLimit,
dcpCheckpointCommit: dcpCheckpointCommit,
batchBytes: batchBytes,
sinkResponseHandler: sinkResponseHandler,
}
return batch
}
Expand Down Expand Up @@ -108,15 +113,31 @@ func (b *Batch) FlushMessages() {
if len(b.messages) > 0 {
startedTime := time.Now()
err := b.Writer.WriteMessages(context.Background(), b.messages...)
if err != nil {

if err != nil && b.sinkResponseHandler == nil {
if isFatalError(err) {
panic(fmt.Errorf("permanent error on Kafka side %v", err))
}
logger.Log.Error("batch producer flush error %v", err)
return
}

b.metric.BatchProduceLatency = time.Since(startedTime).Milliseconds()

if b.sinkResponseHandler != nil {
switch e := err.(type) {
case nil:
b.handleResponseSuccess()
case kafka.WriteErrors:
b.handleWriteError(e)
case kafka.MessageTooLargeError:
b.handleMessageTooLargeError(e)
return
default:
logger.Log.Error("batch producer flush error %v", err)
return
}
}
b.messages = b.messages[:0]
b.currentMessageBytes = 0
b.batchTicker.Reset(b.batchTickerDuration)
Expand All @@ -136,3 +157,44 @@ func isFatalError(err error) bool {
}
return true
}

func (b *Batch) handleWriteError(writeErrors kafka.WriteErrors) {
for i := range writeErrors {
if writeErrors[i] != nil {
b.sinkResponseHandler.OnError(&gKafka.SinkResponseHandlerContext{
Message: convertKafkaMessage(b.messages[i]),
Err: writeErrors[i],
})
} else {
b.sinkResponseHandler.OnSuccess(&gKafka.SinkResponseHandlerContext{
Message: convertKafkaMessage(b.messages[i]),
Err: nil,
})
}
}
}

func (b *Batch) handleResponseSuccess() {
for _, msg := range b.messages {
b.sinkResponseHandler.OnSuccess(&gKafka.SinkResponseHandlerContext{
Message: convertKafkaMessage(msg),
Err: nil,
})
}
}

func (b *Batch) handleMessageTooLargeError(mTooLargeError kafka.MessageTooLargeError) {
b.sinkResponseHandler.OnError(&gKafka.SinkResponseHandlerContext{
Message: convertKafkaMessage(mTooLargeError.Message),
Err: mTooLargeError,
})
}

func convertKafkaMessage(src kafka.Message) *message.KafkaMessage {
return &message.KafkaMessage{
Topic: src.Topic,
Headers: src.Headers,
Key: src.Key,
Value: src.Value,
}
}
15 changes: 15 additions & 0 deletions kafka/sink_response_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package kafka

import (
"github.com/Trendyol/go-dcp-kafka/kafka/message"
)

type SinkResponseHandlerContext struct {
Message *message.KafkaMessage
Err error
}

type SinkResponseHandler interface {
OnSuccess(ctx *SinkResponseHandlerContext)
OnError(ctx *SinkResponseHandlerContext)
}

0 comments on commit 953445c

Please sign in to comment.