Skip to content

Commit

Permalink
Merge pull request #1695 from zendesk/ktsanaktsidis/docker_compose_tests
Browse files Browse the repository at this point in the history
Use docker-compose to run the functional tests
  • Loading branch information
bai authored Jun 24, 2020
2 parents 6523153 + cfd5999 commit ad70aac
Show file tree
Hide file tree
Showing 21 changed files with 524 additions and 335 deletions.
14 changes: 1 addition & 13 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,8 @@ jobs:
platform: [ubuntu-latest]

env:
KAFKA_PEERS: localhost:9091,localhost:9092,localhost:9093,localhost:9094,localhost:9095
TOXIPROXY_ADDR: http://localhost:8474
KAFKA_INSTALL_ROOT: /home/runner/kafka
KAFKA_HOSTNAME: localhost
DEBUG: true
KAFKA_VERSION: ${{ matrix.kafka-version }}
KAFKA_SCALA_VERSION: 2.12

steps:
- uses: actions/checkout@v1
Expand Down Expand Up @@ -48,16 +43,9 @@ jobs:
run: |
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.27.0
export REPOSITORY_ROOT=${GITHUB_WORKSPACE}
vagrant/install_cluster.sh
vagrant/boot_cluster.sh
vagrant/create_topics.sh
vagrant/run_java_producer.sh
- name: Run test suite
run: make test
run: make test_functional

- name: Run linter
run: make lint

- name: Teardown
run: vagrant/halt_cluster.sh
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,6 @@ _testmain.go

coverage.txt
profile.out

simplest-uncommitted-msg-0.1-jar-with-dependencies.jar

6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ fmt:
gofmt -s -l -w $(FILES) $(TESTS)

lint:
golangci-lint run
GOFLAGS="-tags=functional" golangci-lint run

test:
$(GOTEST) ./...

.PHONY: test_functional
test_functional:
$(GOTEST) -tags=functional ./...
134 changes: 134 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
version: '3.7'
services:
zookeeper-1:
image: 'confluentinc/cp-zookeeper:${CONFLUENT_PLATFORM_VERSION:-5.5.0}'
restart: always
environment:
ZOOKEEPER_SERVER_ID: '1'
ZOOKEEPER_SERVERS: 'zookeeper-1:2888:3888;zookeeper-2:2888:3888;zookeeper-3:2888:3888'
ZOOKEEPER_CLIENT_PORT: '2181'
ZOOKEEPER_PEER_PORT: '2888'
ZOOKEEPER_LEADER_PORT: '3888'
ZOOKEEPER_INIT_LIMIT: '10'
ZOOKEEPER_SYNC_LIMIT: '5'
ZOOKEEPER_MAX_CLIENT_CONNS: '0'
zookeeper-2:
image: 'confluentinc/cp-zookeeper:${CONFLUENT_PLATFORM_VERSION:-5.5.0}'
restart: always
environment:
ZOOKEEPER_SERVER_ID: '2'
ZOOKEEPER_SERVERS: 'zookeeper-1:2888:3888;zookeeper-2:2888:3888;zookeeper-3:2888:3888'
ZOOKEEPER_CLIENT_PORT: '2181'
ZOOKEEPER_PEER_PORT: '2888'
ZOOKEEPER_LEADER_PORT: '3888'
ZOOKEEPER_INIT_LIMIT: '10'
ZOOKEEPER_SYNC_LIMIT: '5'
ZOOKEEPER_MAX_CLIENT_CONNS: '0'
zookeeper-3:
image: 'confluentinc/cp-zookeeper:${CONFLUENT_PLATFORM_VERSION:-5.5.0}'
restart: always
environment:
ZOOKEEPER_SERVER_ID: '3'
ZOOKEEPER_SERVERS: 'zookeeper-1:2888:3888;zookeeper-2:2888:3888;zookeeper-3:2888:3888'
ZOOKEEPER_CLIENT_PORT: '2181'
ZOOKEEPER_PEER_PORT: '2888'
ZOOKEEPER_LEADER_PORT: '3888'
ZOOKEEPER_INIT_LIMIT: '10'
ZOOKEEPER_SYNC_LIMIT: '5'
ZOOKEEPER_MAX_CLIENT_CONNS: '0'
kafka-1:
image: 'confluentinc/cp-kafka:${CONFLUENT_PLATFORM_VERSION:-5.5.0}'
restart: always
environment:
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181'
KAFKA_LISTENERS: 'LISTENER_INTERNAL://:9091,LISTENER_LOCAL://:29091'
KAFKA_ADVERTISED_LISTENERS: 'LISTENER_INTERNAL://kafka-1:9091,LISTENER_LOCAL://localhost:29091'
KAFKA_INTER_BROKER_LISTENER_NAME: 'LISTENER_INTERNAL'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'LISTENER_INTERNAL:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT'
KAFKA_DEFAULT_REPLICATION_FACTOR: '2'
KAFKA_BROKER_ID: '1'
KAFKA_BROKER_RACK: '1'
KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: '3000'
KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: '3000'
KAFKA_REPLICA_SELECTOR_CLASS: 'org.apache.kafka.common.replica.RackAwareReplicaSelector'
KAFKA_DELETE_TOPIC_ENABLE: 'true'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
kafka-2:
image: 'confluentinc/cp-kafka:${CONFLUENT_PLATFORM_VERSION:-5.5.0}'
restart: always
environment:
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181'
KAFKA_LISTENERS: 'LISTENER_INTERNAL://:9091,LISTENER_LOCAL://:29092'
KAFKA_ADVERTISED_LISTENERS: 'LISTENER_INTERNAL://kafka-2:9091,LISTENER_LOCAL://localhost:29092'
KAFKA_INTER_BROKER_LISTENER_NAME: 'LISTENER_INTERNAL'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'LISTENER_INTERNAL:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT'
KAFKA_DEFAULT_REPLICATION_FACTOR: '2'
KAFKA_BROKER_ID: '2'
KAFKA_BROKER_RACK: '2'
KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: '3000'
KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: '3000'
KAFKA_REPLICA_SELECTOR_CLASS: 'org.apache.kafka.common.replica.RackAwareReplicaSelector'
KAFKA_DELETE_TOPIC_ENABLE: 'true'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
kafka-3:
image: 'confluentinc/cp-kafka:${CONFLUENT_PLATFORM_VERSION:-5.5.0}'
restart: always
environment:
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181'
KAFKA_LISTENERS: 'LISTENER_INTERNAL://:9091,LISTENER_LOCAL://:29093'
KAFKA_ADVERTISED_LISTENERS: 'LISTENER_INTERNAL://kafka-3:9091,LISTENER_LOCAL://localhost:29093'
KAFKA_INTER_BROKER_LISTENER_NAME: 'LISTENER_INTERNAL'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'LISTENER_INTERNAL:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT'
KAFKA_DEFAULT_REPLICATION_FACTOR: '2'
KAFKA_BROKER_ID: '3'
KAFKA_BROKER_RACK: '3'
KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: '3000'
KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: '3000'
KAFKA_REPLICA_SELECTOR_CLASS: 'org.apache.kafka.common.replica.RackAwareReplicaSelector'
KAFKA_DELETE_TOPIC_ENABLE: 'true'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
kafka-4:
image: 'confluentinc/cp-kafka:${CONFLUENT_PLATFORM_VERSION:-5.5.0}'
restart: always
environment:
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181'
KAFKA_LISTENERS: 'LISTENER_INTERNAL://:9091,LISTENER_LOCAL://:29094'
KAFKA_ADVERTISED_LISTENERS: 'LISTENER_INTERNAL://kafka-4:9091,LISTENER_LOCAL://localhost:29094'
KAFKA_INTER_BROKER_LISTENER_NAME: 'LISTENER_INTERNAL'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'LISTENER_INTERNAL:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT'
KAFKA_DEFAULT_REPLICATION_FACTOR: '2'
KAFKA_BROKER_ID: '4'
KAFKA_BROKER_RACK: '4'
KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: '3000'
KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: '3000'
KAFKA_REPLICA_SELECTOR_CLASS: 'org.apache.kafka.common.replica.RackAwareReplicaSelector'
KAFKA_DELETE_TOPIC_ENABLE: 'true'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
kafka-5:
image: 'confluentinc/cp-kafka:${CONFLUENT_PLATFORM_VERSION:-5.5.0}'
restart: always
environment:
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181'
KAFKA_LISTENERS: 'LISTENER_INTERNAL://:9091,LISTENER_LOCAL://:29095'
KAFKA_ADVERTISED_LISTENERS: 'LISTENER_INTERNAL://kafka-5:9091,LISTENER_LOCAL://localhost:29095'
KAFKA_INTER_BROKER_LISTENER_NAME: 'LISTENER_INTERNAL'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'LISTENER_INTERNAL:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT'
KAFKA_DEFAULT_REPLICATION_FACTOR: '2'
KAFKA_BROKER_ID: '5'
KAFKA_BROKER_RACK: '5'
KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: '3000'
KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: '3000'
KAFKA_REPLICA_SELECTOR_CLASS: 'org.apache.kafka.common.replica.RackAwareReplicaSelector'
KAFKA_DELETE_TOPIC_ENABLE: 'true'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
toxiproxy:
image: 'shopify/toxiproxy:2.1.4'
ports:
# The tests themselves actually start the proies on these ports
- '29091:29091'
- '29092:29092'
- '29093:29093'
- '29094:29094'
- '29095:29095'
# This is the toxiproxy API port
- '8474:8474'
10 changes: 6 additions & 4 deletions functional_client_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//+build functional

package sarama

import (
Expand All @@ -10,13 +12,13 @@ func TestFuncConnectionFailure(t *testing.T) {
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

Proxies["kafka1"].Enabled = false
FunctionalTestEnv.Proxies["kafka1"].Enabled = false
SaveProxy(t, "kafka1")

config := NewConfig()
config.Metadata.Retry.Max = 1

_, err := NewClient([]string{kafkaBrokers[0]}, config)
_, err := NewClient([]string{FunctionalTestEnv.KafkaBrokerAddrs[0]}, config)
if err != ErrOutOfBrokers {
t.Fatal("Expected returned error to be ErrOutOfBrokers, but was: ", err)
}
Expand All @@ -29,7 +31,7 @@ func TestFuncClientMetadata(t *testing.T) {
config := NewConfig()
config.Metadata.Retry.Max = 1
config.Metadata.Retry.Backoff = 10 * time.Millisecond
client, err := NewClient(kafkaBrokers, config)
client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -70,7 +72,7 @@ func TestFuncClientCoordinator(t *testing.T) {
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

client, err := NewClient(kafkaBrokers, nil)
client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, nil)
if err != nil {
t.Fatal(err)
}
Expand Down
6 changes: 3 additions & 3 deletions functional_consumer_group_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// +build go1.9
//+build functional

package sarama

Expand Down Expand Up @@ -153,7 +153,7 @@ func testFuncConsumerGroupID(t *testing.T) string {
}

func testFuncConsumerGroupFuzzySeed(topic string) error {
client, err := NewClient(kafkaBrokers, nil)
client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -245,7 +245,7 @@ func runTestFuncConsumerGroupMember(t *testing.T, groupID, clientID string, maxM
config.Consumer.Offsets.Initial = OffsetOldest
config.Consumer.Group.Rebalance.Timeout = 10 * time.Second

group, err := NewConsumerGroup(kafkaBrokers, groupID, config)
group, err := NewConsumerGroup(FunctionalTestEnv.KafkaBrokerAddrs, groupID, config)
if err != nil {
t.Fatal(err)
return nil
Expand Down
14 changes: 8 additions & 6 deletions functional_consumer_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//+build functional

package sarama

import (
Expand All @@ -16,7 +18,7 @@ func TestFuncConsumerOffsetOutOfRange(t *testing.T) {
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

consumer, err := NewConsumer(kafkaBrokers, nil)
consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, nil)
if err != nil {
t.Fatal(err)
}
Expand All @@ -36,7 +38,7 @@ func TestConsumerHighWaterMarkOffset(t *testing.T) {
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

p, err := NewSyncProducer(kafkaBrokers, nil)
p, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, nil)
if err != nil {
t.Fatal(err)
}
Expand All @@ -47,7 +49,7 @@ func TestConsumerHighWaterMarkOffset(t *testing.T) {
t.Fatal(err)
}

c, err := NewConsumer(kafkaBrokers, nil)
c, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, nil)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -143,7 +145,7 @@ func TestReadOnlyAndAllCommittedMessages(t *testing.T) {
config.Consumer.IsolationLevel = ReadCommitted
config.Version = V0_11_0_0

consumer, err := NewConsumer(kafkaBrokers, config)
consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, config)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -205,7 +207,7 @@ func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []Compressi
prodCfg.Net.MaxOpenRequests = 1
}

p, err := NewSyncProducer(kafkaBrokers, prodCfg)
p, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, prodCfg)
if err != nil {
t.Errorf("Failed to create producer: version=%s, compression=%s, err=%v", prodVer, codec, err)
continue
Expand Down Expand Up @@ -251,7 +253,7 @@ consumerVersionLoop:
// message.
consCfg := NewConfig()
consCfg.Version = consVer
c, err := NewConsumer(kafkaBrokers, consCfg)
c, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, consCfg)
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 3 additions & 1 deletion functional_offset_manager_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//+build functional

package sarama

import (
Expand All @@ -9,7 +11,7 @@ func TestFuncOffsetManager(t *testing.T) {
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

client, err := NewClient(kafkaBrokers, nil)
client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, nil)
if err != nil {
t.Fatal(err)
}
Expand Down
Loading

0 comments on commit ad70aac

Please sign in to comment.