Skip to content

Commit

Permalink
add error return for new connection init (#31)
Browse files Browse the repository at this point in the history
* add error return for new connection init

* add error return for new producer init

* update newConnector method returnings
  • Loading branch information
halilkocaoz authored Mar 21, 2023
1 parent 0ac05db commit a72c551
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 10 deletions.
16 changes: 11 additions & 5 deletions connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,15 @@ func (c *connector) listener(ctx *models.ListenerContext) {
}
}

func NewConnector(configPath string, mapper Mapper) Connector {
func NewConnector(configPath string, mapper Mapper) (Connector, error) {
return newConnector(configPath, mapper, &logger.Log, &logger.Log)
}

func NewConnectorWithLoggers(configPath string, mapper Mapper, logger logger.Logger, errorLogger logger.Logger) Connector {
func NewConnectorWithLoggers(configPath string, mapper Mapper, logger logger.Logger, errorLogger logger.Logger) (Connector, error) {
return newConnector(configPath, mapper, logger, errorLogger)
}

func newConnector(configPath string, mapper Mapper, logger logger.Logger, errorLogger logger.Logger) Connector {
func newConnector(configPath string, mapper Mapper, logger logger.Logger, errorLogger logger.Logger) (Connector, error) {
c := config.NewConfig("cbgokafka", configPath, errorLogger)

connector := &connector{
Expand All @@ -82,8 +82,14 @@ func newConnector(configPath string, mapper Mapper, logger logger.Logger, errorL
dcp, err := godcpclient.NewDcp(configPath, connector.listener)
if err != nil {
connector.errorLogger.Printf("Dcp error: %v", err)
return nil, err
}

connector.dcp = dcp
connector.producer = kafka.NewProducer(c.Kafka, connector.logger, connector.errorLogger, dcp.Commit)
return connector
connector.producer, err = kafka.NewProducer(c.Kafka, connector.logger, connector.errorLogger, dcp.Commit)
if err != nil {
connector.errorLogger.Printf("Kafka error: %v", err)
return nil, err
}
return connector, nil
}
6 changes: 4 additions & 2 deletions example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ func mapper(event couchbase.Event) *message.KafkaMessage {
}

func main() {
connector := gokafkaconnectcouchbase.NewConnector("./example/config.yml", mapper)

connector, err := gokafkaconnectcouchbase.NewConnector("./example/config.yml", mapper)
if err != nil {
panic("New connector is could not initialized: " + err.Error())
}
defer connector.Close()

connector.Start()
Expand Down
8 changes: 5 additions & 3 deletions kafka/producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type producer struct {
producerBatch *producerBatch
}

func NewProducer(config *config.Kafka, logger logger.Logger, errorLogger logger.Logger, dcpCheckpointCommit func()) Producer {
func NewProducer(config *config.Kafka, logger logger.Logger, errorLogger logger.Logger, dcpCheckpointCommit func()) (Producer, error) {
writer := &kafka.Writer{
Addr: kafka.TCP(config.Brokers...),
Balancer: &kafka.Hash{},
Expand All @@ -41,14 +41,16 @@ func NewProducer(config *config.Kafka, logger logger.Logger, errorLogger logger.
ErrorLogger: errorLogger,
Compression: kafka.Compression(config.GetCompression()),
}

if config.SecureConnection {
transport, err := createSecureKafkaTransport(config.ScramUsername, config.ScramPassword, config.RootCAPath,
config.InterCAPath, errorLogger)
if err != nil {
panic("Secure kafka couldn't connect " + err.Error())
return nil, err
}
writer.Transport = transport
}

return &producer{
producerBatch: newProducerBatch(
config.ProducerBatchTickerDuration,
Expand All @@ -57,7 +59,7 @@ func NewProducer(config *config.Kafka, logger logger.Logger, errorLogger logger.
logger,
errorLogger,
dcpCheckpointCommit),
}
}, nil
}

func createSecureKafkaTransport(
Expand Down

0 comments on commit a72c551

Please sign in to comment.