diff --git a/config/config.go b/config/config.go index e118451..bcf8117 100644 --- a/config/config.go +++ b/config/config.go @@ -1,10 +1,13 @@ package config import ( + "errors" "math" "strconv" "time" + "github.com/Trendyol/go-dcp/logger" + "github.com/Trendyol/go-dcp/config" "github.com/Trendyol/go-dcp/helpers" "github.com/segmentio/kafka-go" @@ -50,13 +53,17 @@ func (k *Kafka) GetBalancer() kafka.Balancer { case "Murmur2Balancer": return kafka.Murmur2Balancer{} default: - panic("invalid kafka balancer method, given: " + k.Balancer) + err := errors.New("invalid kafka balancer method, given: " + k.Balancer) + logger.Log.Error("error while get kafka balancer, err: %v", err) + panic(err) } } func (k *Kafka) GetCompression() int8 { if k.Compression < 0 || k.Compression > 4 { - panic("invalid kafka compression method, given: " + strconv.Itoa(int(k.Compression))) + err := errors.New("invalid kafka compression method, given: " + strconv.Itoa(int(k.Compression))) + logger.Log.Error("error while get kafka compression, err: %v", err) + panic(err) } return k.Compression } diff --git a/connector.go b/connector.go index e74e5c7..55dab73 100644 --- a/connector.go +++ b/connector.go @@ -123,7 +123,12 @@ func (c *connector) getTopicName(collectionName string, messageTopic string) str topic := c.config.Kafka.CollectionTopicMapping[collectionName] if topic == "" { - panic(fmt.Sprintf("there is no topic mapping for collection: %s on your configuration", collectionName)) + err := fmt.Errorf( + "there is no topic mapping for collection: %s on your configuration", + collectionName, + ) + logger.Log.Error("error while get topic name, err: %v", err) + panic(err) } return topic } diff --git a/docker-compose.yml b/docker-compose.yml index 6e7b8fe..4579ff9 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -7,41 +7,49 @@ services: - "8091:8091" - "11210:11210" healthcheck: - test: [ "CMD", "curl", "-f", "http://user:123456@localhost:8091/pools/default/buckets/dcp-test" ] + test: [ "CMD", "curl", "-f", "http://user:password@localhost:8091/pools/default/buckets/dcp-test" ] interval: 2s timeout: 3s retries: 60 - redpanda: - image: docker.redpanda.com/redpandadata/redpanda - container_name: redpanda-1 - command: - - redpanda - - start - - --smp - - '1' - - --reserve-memory - - 0M - - --overprovisioned - - --node-id - - '0' - - --kafka-addr - - PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092 - - --advertise-kafka-addr - - PLAINTEXT://redpanda:29092,OUTSIDE://localhost:9092 - - --pandaproxy-addr - - PLAINTEXT://0.0.0.0:28082,OUTSIDE://0.0.0.0:8082 - - --advertise-pandaproxy-addr - - PLAINTEXT://redpanda:28082,OUTSIDE://localhost:8082 + zookeeper: + image: confluentinc/cp-zookeeper:7.6.1 + container_name: zookeeper + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + broker: + image: confluentinc/cp-kafka:7.6.1 + container_name: broker ports: - - 8081:8081 - - 8082:8082 - - 9092:9092 - - 28082:28082 - - 29092:29092 + - "9092:9092" + depends_on: + - zookeeper + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092 + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_AUTO_CREATE_TOPICS_ENABLE: true + kowl: + image: quay.io/cloudhut/kowl:master-59f68da + container_name: kowl + depends_on: + - broker + ports: + - "8081:8080" + environment: + KAFKA_BROKERS: 'broker:29092' + healthcheck: + test: wget --no-verbose --tries=1 --spider http://localhost:8080/api/topics/topicname/partitions || exit 1 + interval: 2s + timeout: 3s + retries: 60 redpanda-topic-create: - image: docker.redpanda.com/redpandadata/redpanda + image: docker.redpanda.com/redpandadata/redpanda:v24.2.8 container_name: redpanda-topic-create - entrypoint: [ "bash", "-c", "sleep 5 && rpk topic create test --brokers redpanda:29092" ] depends_on: - - redpanda - restart: "no" + - kowl + entrypoint: [ "bash", "-c", "sleep 10 && rpk topic create topicname -c max.message.bytes=10485760 --brokers broker:29092" ] \ No newline at end of file diff --git a/example/simple/main.go b/example/simple/main.go index dcbeac9..6d7db15 100644 --- a/example/simple/main.go +++ b/example/simple/main.go @@ -23,11 +23,11 @@ type sinkResponseHandler struct { } func (s *sinkResponseHandler) OnSuccess(ctx *kafka.SinkResponseHandlerContext) { - fmt.Printf("OnSuccess %v\n", string(ctx.Message.Value)) + fmt.Printf("OnSuccess Key: %v, Len: %v\n", string(ctx.Message.Key), len(ctx.Message.Value)) } func (s *sinkResponseHandler) OnError(ctx *kafka.SinkResponseHandlerContext) { - fmt.Printf("OnError %v\n", string(ctx.Message.Value)) + fmt.Printf("OnError Key: %v, Len: %v, Err: %v\n", string(ctx.Message.Key), len(ctx.Message.Value), ctx.Err) } func main() { diff --git a/kafka/client.go b/kafka/client.go index 69903b3..35d4350 100644 --- a/kafka/client.go +++ b/kafka/client.go @@ -300,6 +300,7 @@ func NewClient(config *config.Connector) Client { config.Kafka.InterCA, ) if err != nil { + logger.Log.Error("error while creating new tls content, err: %v", err) panic(err) } diff --git a/test/couchbase/Dockerfile b/test/couchbase/Dockerfile index 2ab3ddb..d7b9015 100644 --- a/test/couchbase/Dockerfile +++ b/test/couchbase/Dockerfile @@ -1,4 +1,4 @@ -FROM couchbase:community-7.6.1 +FROM couchbase:7.6.3 ADD configure.sh /configure.sh RUN chmod +x /configure.sh diff --git a/test/couchbase/configure.sh b/test/couchbase/configure.sh index 7f8a842..5b3812f 100644 --- a/test/couchbase/configure.sh +++ b/test/couchbase/configure.sh @@ -30,11 +30,12 @@ until [[ $(check_db) = 0 ]]; do done couchbase-cli cluster-init -c localhost --cluster-name Cluster --cluster-username user \ - --cluster-password 123456 --services data --cluster-ramsize 1024 + --cluster-password password --services data --cluster-ramsize 1024 -couchbase-cli bucket-create -c couchbase --username user --password 123456 --bucket dcp-test --bucket-type couchbase --bucket-ramsize 1024 +couchbase-cli bucket-create -c couchbase --username user --password password --bucket dcp-test --bucket-type couchbase --bucket-ramsize 768 +couchbase-cli bucket-create -c couchbase --username user --password password --bucket checkpoint-bucket-name --bucket-type couchbase --bucket-ramsize 256 -cbimport json -c couchbase://127.0.0.1 -u user -p 123456 --bucket-quota 1024 -b dcp-test -d file://opt/couchbase/samples/travel-sample.zip -f sample +cbimport json -c couchbase://127.0.0.1 -u user -p password --bucket-quota 768 -b dcp-test -d file://opt/couchbase/samples/travel-sample.zip -f sample echo "couchbase-dev started" diff --git a/test/integration/config.yml b/test/integration/config.yml index b4128ed..0aee836 100644 --- a/test/integration/config.yml +++ b/test/integration/config.yml @@ -1,7 +1,7 @@ hosts: - localhost:8091 username: user -password: 123456 +password: password bucketName: dcp-test rollbackMitigation: disabled: true @@ -21,7 +21,7 @@ metadata: collection: _default kafka: collectionTopicMapping: - _default: test + _default: topicname brokers: - "localhost:9092" readTimeout: 30s @@ -31,4 +31,4 @@ kafka: producerBatchTickerDuration: 5s metadataTTL: 2400s metadataTopics: - - "test" \ No newline at end of file + - "topicname" \ No newline at end of file