From f33a0afab27e9024c34d4d3ef4c37f8aea4d9f56 Mon Sep 17 00:00:00 2001 From: KJ Tsanaktsidis Date: Sat, 9 May 2020 18:11:56 +1000 Subject: [PATCH 01/16] Add docker-composed based functional test harness * Functional tests are skipped except when the `functional` build tag is passed to the go test command (i.e. go test -tags=functional) * If TOXIPROXY_ADDR is not set when the functional tests are being run, it will use docker-compose to automatically spin up a zookeeper/kafka/toxiproxy environment suitab le for running the tests. This requies a working Docker and for the docker-compose command line tool to be installed. * If TOXIPROXY_ADDR and KAFKA_VERSION are set, then the tests will not spin up any docker infrastructure and will instead rely on whatever kafka broker is behind the specified toxiproxy. --- .gitignore | 3 + docker-compose.yml | 134 +++++++++++ functional_client_test.go | 10 +- functional_consumer_group_test.go | 6 +- functional_consumer_test.go | 14 +- functional_offset_manager_test.go | 4 +- functional_producer_test.go | 18 +- functional_test.go | 386 ++++++++++++++++++++++++++---- 8 files changed, 504 insertions(+), 71 deletions(-) create mode 100644 docker-compose.yml diff --git a/.gitignore b/.gitignore index 6e362e4f2..eb4b19509 100644 --- a/.gitignore +++ b/.gitignore @@ -25,3 +25,6 @@ _testmain.go coverage.txt profile.out + +simplest-uncommitted-msg-0.1-jar-with-dependencies.jar + diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 000000000..03425a6f9 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,134 @@ +version: '3.7' +services: + zookeeper-1: + image: 'confluentinc/cp-zookeeper:5.5.0' + restart: always + environment: + ZOOKEEPER_SERVER_ID: '1' + ZOOKEEPER_SERVERS: 'zookeeper-1:2888:3888;zookeeper-2:2888:3888;zookeeper-3:2888:3888' + ZOOKEEPER_CLIENT_PORT: '2181' + ZOOKEEPER_PEER_PORT: '2888' + ZOOKEEPER_LEADER_PORT: '3888' + ZOOKEEPER_INIT_LIMIT: '10' + ZOOKEEPER_SYNC_LIMIT: '5' + ZOOKEEPER_MAX_CLIENT_CONNS: '0' + zookeeper-2: + image: 'confluentinc/cp-zookeeper:5.5.0' + restart: always + environment: + ZOOKEEPER_SERVER_ID: '2' + ZOOKEEPER_SERVERS: 'zookeeper-1:2888:3888;zookeeper-2:2888:3888;zookeeper-3:2888:3888' + ZOOKEEPER_CLIENT_PORT: '2181' + ZOOKEEPER_PEER_PORT: '2888' + ZOOKEEPER_LEADER_PORT: '3888' + ZOOKEEPER_INIT_LIMIT: '10' + ZOOKEEPER_SYNC_LIMIT: '5' + ZOOKEEPER_MAX_CLIENT_CONNS: '0' + zookeeper-3: + image: 'confluentinc/cp-zookeeper:5.5.0' + restart: always + environment: + ZOOKEEPER_SERVER_ID: '3' + ZOOKEEPER_SERVERS: 'zookeeper-1:2888:3888;zookeeper-2:2888:3888;zookeeper-3:2888:3888' + ZOOKEEPER_CLIENT_PORT: '2181' + ZOOKEEPER_PEER_PORT: '2888' + ZOOKEEPER_LEADER_PORT: '3888' + ZOOKEEPER_INIT_LIMIT: '10' + ZOOKEEPER_SYNC_LIMIT: '5' + ZOOKEEPER_MAX_CLIENT_CONNS: '0' + kafka-1: + image: 'confluentinc/cp-kafka:5.5.0' + restart: always + environment: + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181' + KAFKA_LISTENERS: 'LISTENER_INTERNAL://:9091,LISTENER_LOCAL://:29091' + KAFKA_ADVERTISED_LISTENERS: 'LISTENER_INTERNAL://kafka-1:9091,LISTENER_LOCAL://localhost:29091' + KAFKA_INTER_BROKER_LISTENER_NAME: 'LISTENER_INTERNAL' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'LISTENER_INTERNAL:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT' + KAFKA_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_BROKER_ID: '1' + KAFKA_BROKER_RACK: '1' + KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: '3000' + KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: '3000' + KAFKA_REPLICA_SELECTOR_CLASS: 'org.apache.kafka.common.replica.RackAwareReplicaSelector' + KAFKA_DELETE_TOPIC_ENABLE: 'true' + KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false' + kafka-2: + image: 'confluentinc/cp-kafka:5.5.0' + restart: always + environment: + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181' + KAFKA_LISTENERS: 'LISTENER_INTERNAL://:9091,LISTENER_LOCAL://:29092' + KAFKA_ADVERTISED_LISTENERS: 'LISTENER_INTERNAL://kafka-2:9091,LISTENER_LOCAL://localhost:29092' + KAFKA_INTER_BROKER_LISTENER_NAME: 'LISTENER_INTERNAL' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'LISTENER_INTERNAL:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT' + KAFKA_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_BROKER_ID: '2' + KAFKA_BROKER_RACK: '2' + KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: '3000' + KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: '3000' + KAFKA_REPLICA_SELECTOR_CLASS: 'org.apache.kafka.common.replica.RackAwareReplicaSelector' + KAFKA_DELETE_TOPIC_ENABLE: 'true' + KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false' + kafka-3: + image: 'confluentinc/cp-kafka:5.5.0' + restart: always + environment: + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181' + KAFKA_LISTENERS: 'LISTENER_INTERNAL://:9091,LISTENER_LOCAL://:29093' + KAFKA_ADVERTISED_LISTENERS: 'LISTENER_INTERNAL://kafka-3:9091,LISTENER_LOCAL://localhost:29093' + KAFKA_INTER_BROKER_LISTENER_NAME: 'LISTENER_INTERNAL' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'LISTENER_INTERNAL:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT' + KAFKA_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_BROKER_ID: '3' + KAFKA_BROKER_RACK: '3' + KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: '3000' + KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: '3000' + KAFKA_REPLICA_SELECTOR_CLASS: 'org.apache.kafka.common.replica.RackAwareReplicaSelector' + KAFKA_DELETE_TOPIC_ENABLE: 'true' + KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false' + kafka-4: + image: 'confluentinc/cp-kafka:5.5.0' + restart: always + environment: + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181' + KAFKA_LISTENERS: 'LISTENER_INTERNAL://:9091,LISTENER_LOCAL://:29094' + KAFKA_ADVERTISED_LISTENERS: 'LISTENER_INTERNAL://kafka-4:9091,LISTENER_LOCAL://localhost:29094' + KAFKA_INTER_BROKER_LISTENER_NAME: 'LISTENER_INTERNAL' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'LISTENER_INTERNAL:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT' + KAFKA_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_BROKER_ID: '4' + KAFKA_BROKER_RACK: '4' + KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: '3000' + KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: '3000' + KAFKA_REPLICA_SELECTOR_CLASS: 'org.apache.kafka.common.replica.RackAwareReplicaSelector' + KAFKA_DELETE_TOPIC_ENABLE: 'true' + KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false' + kafka-5: + image: 'confluentinc/cp-kafka:5.5.0' + restart: always + environment: + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181' + KAFKA_LISTENERS: 'LISTENER_INTERNAL://:9091,LISTENER_LOCAL://:29095' + KAFKA_ADVERTISED_LISTENERS: 'LISTENER_INTERNAL://kafka-5:9091,LISTENER_LOCAL://localhost:29095' + KAFKA_INTER_BROKER_LISTENER_NAME: 'LISTENER_INTERNAL' + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'LISTENER_INTERNAL:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT' + KAFKA_DEFAULT_REPLICATION_FACTOR: '2' + KAFKA_BROKER_ID: '5' + KAFKA_BROKER_RACK: '5' + KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: '3000' + KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: '3000' + KAFKA_REPLICA_SELECTOR_CLASS: 'org.apache.kafka.common.replica.RackAwareReplicaSelector' + KAFKA_DELETE_TOPIC_ENABLE: 'true' + KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false' + toxiproxy: + image: 'shopify/toxiproxy:2.1.4' + ports: + # The tests themselves actually start the proies on these ports + - '29091:29091' + - '29092:29092' + - '29093:29093' + - '29094:29094' + - '29095:29095' + # This is the toxiproxy API port + - '8474:8474' diff --git a/functional_client_test.go b/functional_client_test.go index 2bf99d252..513b8ee9b 100644 --- a/functional_client_test.go +++ b/functional_client_test.go @@ -1,3 +1,5 @@ +//+build functional + package sarama import ( @@ -10,13 +12,13 @@ func TestFuncConnectionFailure(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) - Proxies["kafka1"].Enabled = false + FunctionalTestEnv.Proxies["kafka1"].Enabled = false SaveProxy(t, "kafka1") config := NewConfig() config.Metadata.Retry.Max = 1 - _, err := NewClient([]string{kafkaBrokers[0]}, config) + _, err := NewClient([]string{FunctionalTestEnv.KafkaBrokerAddrs[0]}, config) if err != ErrOutOfBrokers { t.Fatal("Expected returned error to be ErrOutOfBrokers, but was: ", err) } @@ -29,7 +31,7 @@ func TestFuncClientMetadata(t *testing.T) { config := NewConfig() config.Metadata.Retry.Max = 1 config.Metadata.Retry.Backoff = 10 * time.Millisecond - client, err := NewClient(kafkaBrokers, config) + client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config) if err != nil { t.Fatal(err) } @@ -70,7 +72,7 @@ func TestFuncClientCoordinator(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) - client, err := NewClient(kafkaBrokers, nil) + client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, nil) if err != nil { t.Fatal(err) } diff --git a/functional_consumer_group_test.go b/functional_consumer_group_test.go index ae376086d..4d71510a8 100644 --- a/functional_consumer_group_test.go +++ b/functional_consumer_group_test.go @@ -1,4 +1,4 @@ -// +build go1.9 +//+build functional package sarama @@ -153,7 +153,7 @@ func testFuncConsumerGroupID(t *testing.T) string { } func testFuncConsumerGroupFuzzySeed(topic string) error { - client, err := NewClient(kafkaBrokers, nil) + client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, nil) if err != nil { return err } @@ -245,7 +245,7 @@ func runTestFuncConsumerGroupMember(t *testing.T, groupID, clientID string, maxM config.Consumer.Offsets.Initial = OffsetOldest config.Consumer.Group.Rebalance.Timeout = 10 * time.Second - group, err := NewConsumerGroup(kafkaBrokers, groupID, config) + group, err := NewConsumerGroup(FunctionalTestEnv.KafkaBrokerAddrs, groupID, config) if err != nil { t.Fatal(err) return nil diff --git a/functional_consumer_test.go b/functional_consumer_test.go index 8b31b45c5..aca9434db 100644 --- a/functional_consumer_test.go +++ b/functional_consumer_test.go @@ -1,3 +1,5 @@ +//+build functional + package sarama import ( @@ -16,7 +18,7 @@ func TestFuncConsumerOffsetOutOfRange(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) - consumer, err := NewConsumer(kafkaBrokers, nil) + consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, nil) if err != nil { t.Fatal(err) } @@ -36,7 +38,7 @@ func TestConsumerHighWaterMarkOffset(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) - p, err := NewSyncProducer(kafkaBrokers, nil) + p, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, nil) if err != nil { t.Fatal(err) } @@ -47,7 +49,7 @@ func TestConsumerHighWaterMarkOffset(t *testing.T) { t.Fatal(err) } - c, err := NewConsumer(kafkaBrokers, nil) + c, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, nil) if err != nil { t.Fatal(err) } @@ -143,7 +145,7 @@ func TestReadOnlyAndAllCommittedMessages(t *testing.T) { config.Consumer.IsolationLevel = ReadCommitted config.Version = V0_11_0_0 - consumer, err := NewConsumer(kafkaBrokers, config) + consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, config) if err != nil { t.Fatal(err) } @@ -205,7 +207,7 @@ func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []Compressi prodCfg.Net.MaxOpenRequests = 1 } - p, err := NewSyncProducer(kafkaBrokers, prodCfg) + p, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, prodCfg) if err != nil { t.Errorf("Failed to create producer: version=%s, compression=%s, err=%v", prodVer, codec, err) continue @@ -251,7 +253,7 @@ consumerVersionLoop: // message. consCfg := NewConfig() consCfg.Version = consVer - c, err := NewConsumer(kafkaBrokers, consCfg) + c, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, consCfg) if err != nil { t.Fatal(err) } diff --git a/functional_offset_manager_test.go b/functional_offset_manager_test.go index 436f35ef4..32e160aab 100644 --- a/functional_offset_manager_test.go +++ b/functional_offset_manager_test.go @@ -1,3 +1,5 @@ +//+build functional + package sarama import ( @@ -9,7 +11,7 @@ func TestFuncOffsetManager(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) - client, err := NewClient(kafkaBrokers, nil) + client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, nil) if err != nil { t.Fatal(err) } diff --git a/functional_producer_test.go b/functional_producer_test.go index e589a8eb8..0f86368d3 100644 --- a/functional_producer_test.go +++ b/functional_producer_test.go @@ -1,3 +1,5 @@ +//+build functional + package sarama import ( @@ -53,7 +55,7 @@ func TestFuncMultiPartitionProduce(t *testing.T) { config.Producer.Flush.Frequency = 50 * time.Millisecond config.Producer.Flush.Messages = 200 config.Producer.Return.Successes = true - producer, err := NewSyncProducer(kafkaBrokers, config) + producer, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config) if err != nil { t.Fatal(err) } @@ -81,7 +83,7 @@ func TestFuncProducingToInvalidTopic(t *testing.T) { setupFunctionalTest(t) defer teardownFunctionalTest(t) - producer, err := NewSyncProducer(kafkaBrokers, nil) + producer, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, nil) if err != nil { t.Fatal(err) } @@ -113,7 +115,7 @@ func TestFuncProducingIdempotentWithBrokerFailure(t *testing.T) { config.Net.MaxOpenRequests = 1 config.Version = V0_11_0_0 - producer, err := NewSyncProducer(kafkaBrokers, config) + producer, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config) if err != nil { t.Fatal(err) } @@ -131,7 +133,7 @@ func TestFuncProducingIdempotentWithBrokerFailure(t *testing.T) { } // break the brokers. - for proxyName, proxy := range Proxies { + for proxyName, proxy := range FunctionalTestEnv.Proxies { if !strings.Contains(proxyName, "kafka") { continue } @@ -152,7 +154,7 @@ func TestFuncProducingIdempotentWithBrokerFailure(t *testing.T) { } // Now bring the proxy back up - for proxyName, proxy := range Proxies { + for proxyName, proxy := range FunctionalTestEnv.Proxies { if !strings.Contains(proxyName, "kafka") { continue } @@ -179,7 +181,7 @@ func testProducingMessages(t *testing.T, config *Config) { defer teardownFunctionalTest(t) // Configure some latency in order to properly validate the request latency metric - for _, proxy := range Proxies { + for _, proxy := range FunctionalTestEnv.Proxies { if _, err := proxy.AddToxic("", "latency", "", 1, toxiproxy.Attributes{"latency": 10}); err != nil { t.Fatal("Unable to configure latency toxicity", err) } @@ -188,7 +190,7 @@ func testProducingMessages(t *testing.T, config *Config) { config.Producer.Return.Successes = true config.Consumer.Return.Errors = true - client, err := NewClient(kafkaBrokers, config) + client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config) if err != nil { t.Fatal(err) } @@ -380,7 +382,7 @@ func benchmarkProducer(b *testing.B, conf *Config, topic string, value Encoder) }() } - producer, err := NewAsyncProducer(kafkaBrokers, conf) + producer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, conf) if err != nil { b.Fatal(err) } diff --git a/functional_test.go b/functional_test.go index 778d9e055..e56182be0 100644 --- a/functional_test.go +++ b/functional_test.go @@ -1,79 +1,378 @@ +//+build functional + package sarama import ( + "context" + "fmt" + toxiproxy "github.com/Shopify/toxiproxy/client" + "io" "log" - "math/rand" "net" + "net/http" + "net/url" "os" + "os/exec" + "path/filepath" "strconv" "strings" "testing" "time" - - toxiproxy "github.com/Shopify/toxiproxy/client" ) const ( - VagrantToxiproxy = "http://192.168.100.67:8474" - VagrantKafkaPeers = "192.168.100.67:9091,192.168.100.67:9092,192.168.100.67:9093,192.168.100.67:9094,192.168.100.67:9095" - VagrantZookeeperPeers = "192.168.100.67:2181,192.168.100.67:2182,192.168.100.67:2183,192.168.100.67:2184,192.168.100.67:2185" + uncomittedMsgJar = "https://github.com/FrancoisPoinsot/simplest-uncommitted-msg/releases/download/0.1/simplest-uncommitted-msg-0.1-jar-with-dependencies.jar" ) var ( - kafkaAvailable, kafkaRequired bool - kafkaBrokers []string + testTopicDetails = map[string]*TopicDetail{ + "test.1": { + NumPartitions: 1, + ReplicationFactor: 3, + }, + "test.4": { + NumPartitions: 4, + ReplicationFactor: 3, + }, + "test.64": { + NumPartitions: 64, + ReplicationFactor: 3, + }, + "uncommitted-topic-test-4": { + NumPartitions: 1, + ReplicationFactor: 3, + }, + } - proxyClient *toxiproxy.Client - Proxies map[string]*toxiproxy.Proxy + FunctionalTestEnv *testEnvironment ) -func init() { +func TestMain(m *testing.M) { + // Functional tests for Sarama + // + // You can either set TOXIPROXY_ADDR, which points at a toxiproxy address + // already set up with 21801-21805 bound to zookeeper and 29091-29095 + // bound to kafka. Alternatively, if TOXIPROXY_ADDR is not set, we'll try + // and use Docker to bring up a 5-node zookeeper cluster & 5-node kafka + // cluster, with toxiproxy configured as above. + // + // In either case, the following topics will be deleted (if they exist) and + // then created/pre-seeded with data for the functional test run: + // * uncomitted-topic-test-4 + // * test.1 + // * test.4 + // * test.64 + os.Exit(testMain(m)) +} + +func testMain(m *testing.M) int { + ctx := context.Background() + var env testEnvironment + if os.Getenv("DEBUG") == "true" { Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags) } - seed := time.Now().UTC().UnixNano() - if tmp := os.Getenv("TEST_SEED"); tmp != "" { - seed, _ = strconv.ParseInt(tmp, 0, 64) + usingExisting, err := existingEnvironment(ctx, &env) + if err != nil { + panic(err) + } + if !usingExisting { + err := prepareDockerTestEnvironment(ctx, &env) + if err != nil { + tearDownDockerTestEnvironment(ctx, &env) + panic(err) + } + defer tearDownDockerTestEnvironment(ctx, &env) } - Logger.Println("Using random seed:", seed) - rand.Seed(seed) + if err := prepareTestTopics(ctx, &env); err != nil { + panic(err) + } + FunctionalTestEnv = &env + return m.Run() +} + +type testEnvironment struct { + ToxiproxyClient *toxiproxy.Client + Proxies map[string]*toxiproxy.Proxy + KafkaBrokerAddrs []string + KafkaVersion string +} + +func prepareDockerTestEnvironment(ctx context.Context, env *testEnvironment) error { + Logger.Println("bringing up docker-based test environment") - proxyAddr := os.Getenv("TOXIPROXY_ADDR") - if proxyAddr == "" { - proxyAddr = VagrantToxiproxy + // Always (try to) tear down first. + if err := tearDownDockerTestEnvironment(ctx, env); err != nil { + return fmt.Errorf("failed to tear down existing env: %w", err) } - proxyClient = toxiproxy.NewClient(proxyAddr) - kafkaPeers := os.Getenv("KAFKA_PEERS") - if kafkaPeers == "" { - kafkaPeers = VagrantKafkaPeers + c := exec.Command("docker-compose", "up", "-d") + c.Stdout = os.Stdout + c.Stderr = os.Stderr + err := c.Run() + if err != nil { + return fmt.Errorf("failed to run docker-compose to start test enviroment: %w", err) } - kafkaBrokers = strings.Split(kafkaPeers, ",") - if c, err := net.DialTimeout("tcp", kafkaBrokers[0], 5*time.Second); err == nil { - if err = c.Close(); err == nil { - kafkaAvailable = true + // Set up toxiproxy Proxies + env.ToxiproxyClient = toxiproxy.NewClient("localhost:8474") + env.Proxies = map[string]*toxiproxy.Proxy{} + for i := 1; i <= 5; i++ { + proxyName := fmt.Sprintf("kafka%d", i) + proxy, err := env.ToxiproxyClient.CreateProxy( + proxyName, + fmt.Sprintf("0.0.0.0:%d", 29090 + i), + fmt.Sprintf("kafka-%d:%d", i, 29090 + i), + ) + if err != nil { + return fmt.Errorf("failed to create toxiproxy: %w", err) } + env.Proxies[proxyName] = proxy + env.KafkaBrokerAddrs = append(env.KafkaBrokerAddrs, fmt.Sprintf("127.0.0.1:%d", 29090 + i)) } - kafkaRequired = os.Getenv("CI") != "" + // the mapping of confluent platform docker image vesions -> kafka versions can be + // found here: https://docs.confluent.io/current/installation/versions-interoperability.html + // We have cp-5.5.0 in the docker-compose file, so that's kafka 2.5.0. + env.KafkaVersion = "2.5.0" + + // Wait for the kafka broker to come up + allBrokersUp := false + for i := 0; i < 45 && !allBrokersUp; i++ { + Logger.Println("waiting for kafka brokers to come up") + time.Sleep(1 * time.Second) + config := NewConfig() + config.Version, err = ParseKafkaVersion(env.KafkaVersion) + if err != nil { + return err + } + config.Net.DialTimeout = 1 * time.Second + config.Net.ReadTimeout = 1 * time.Second + config.Net.WriteTimeout = 1 * time.Second + config.ClientID = "sarama-tests" + brokersOk := make([]bool, len(env.KafkaBrokerAddrs)) + retryLoop: + for j, addr := range env.KafkaBrokerAddrs { + client, err := NewClient([]string{addr},config) + if err != nil { + continue + } + err = client.RefreshMetadata() + if err != nil { + continue + } + brokers := client.Brokers() + if len(brokers) < 5 { + continue + } + for _, broker := range brokers { + err := broker.Open(client.Config()) + if err != nil { + continue retryLoop + } + connected, err := broker.Connected() + if err != nil || !connected { + continue retryLoop + } + } + brokersOk[j] = true + } + allBrokersUp = true + for _, u := range brokersOk { + allBrokersUp = allBrokersUp && u + } + } + if !allBrokersUp { + return fmt.Errorf("timed out waiting for broker to come up") + } + + return nil } -func checkKafkaAvailability(t testing.TB) { - if !kafkaAvailable { - if kafkaRequired { - t.Fatalf("Kafka broker is not available on %s. Set KAFKA_PEERS to connect to Kafka on a different location.", kafkaBrokers[0]) - } else { - t.Skipf("Kafka broker is not available on %s. Set KAFKA_PEERS to connect to Kafka on a different location.", kafkaBrokers[0]) +func existingEnvironment(ctx context.Context, env *testEnvironment) (bool, error) { + toxiproxyAddr, ok := os.LookupEnv("TOXIPROXY_ADDR") + if !ok { + return false, nil + } + toxiproxyURL, err := url.Parse(toxiproxyAddr) + if err != nil { + return false, fmt.Errorf("$TOXIPROXY_ADDR not parseable as url") + } + toxiproxyHost := toxiproxyURL.Hostname() + + env.ToxiproxyClient = toxiproxy.NewClient(toxiproxyAddr) + for i := 1; i <= 5; i++ { + proxyName := fmt.Sprintf("kafka%d", i) + proxy, err := env.ToxiproxyClient.Proxy(proxyName) + if err != nil { + return false, fmt.Errorf("no proxy kafka%d on toxiproxy: %w", i, err) + } + env.Proxies[proxyName] = proxy + // get the host:port from the proxy & toxiproxy addr, so we can do "$toxiproxy_addr:$proxy_port" + _, proxyPort, err := net.SplitHostPort(proxy.Listen) + if err != nil { + return false, fmt.Errorf("proxy.Listen not a host:port combo: %w", err) + } + env.KafkaBrokerAddrs = append(env.KafkaBrokerAddrs, fmt.Sprintf("%s:%s", toxiproxyHost, proxyPort)) + } + + env.KafkaVersion, ok = os.LookupEnv("KAFKA_VERSION") + if !ok { + return false, fmt.Errorf("KAFKA_VERSION needs to be provided with TOXIPROXY_ADDR") + } + return true, nil +} + +func tearDownDockerTestEnvironment(ctx context.Context, env *testEnvironment) error { + c := exec.Command("docker-compose", "down", "--volumes") + c.Stdout = os.Stdout + c.Stderr = os.Stderr + downErr := c.Run() + + c = exec.Command("docker-compose", "rm", "-v", "--force", "--stop") + c.Stdout = os.Stdout + c.Stderr = os.Stderr + rmErr := c.Run() + if downErr != nil { + return fmt.Errorf("failed to run docker-compose to stop test enviroment: %w", downErr) + } + if rmErr != nil { + return fmt.Errorf("failed to run docker-compose to rm test enviroment: %w", rmErr) + } + return nil +} + +func prepareTestTopics(ctx context.Context, env *testEnvironment) error { + Logger.Println("creating test topics") + var testTopicNames []string + for topic, _ := range testTopicDetails { + testTopicNames = append(testTopicNames, topic) + } + + Logger.Println("Creating topics") + config := NewConfig() + config.Metadata.Retry.Max = 5 + config.Metadata.Retry.Backoff = 10 * time.Second + config.ClientID = "sarama-tests" + var err error + config.Version, err = ParseKafkaVersion(env.KafkaVersion) + if err != nil { + return fmt.Errorf("failed to parse kafka version %s: %w", env.KafkaVersion, err) + } + + client, err := NewClient(env.KafkaBrokerAddrs, config) + if err != nil { + return fmt.Errorf("failed to connect to kafka: %w", err) + } + defer client.Close() + + controller, err := client.Controller() + if err != nil { + return fmt.Errorf("failed to connect to kafka controller: %w", err) + } + defer controller.Close() + + // Start by deleting the test topics (if they already exist) + deleteRes, err := controller.DeleteTopics(&DeleteTopicsRequest{ + Topics: testTopicNames, + Timeout: 30 * time.Second, + }) + if err != nil { + return fmt.Errorf("failed to delete test topics: %w", err) + } + for topic, topicErr := range deleteRes.TopicErrorCodes { + if !isTopicNotExistsErrorOrOk(topicErr) { + return fmt.Errorf("failed to delete topic %s: %w", topic, topicErr) + } + } + + // wait for the topics to _actually_ be gone - the delete is not guaranteed to be processed + // synchronously + var topicsOk bool + for i := 0; i < 20 && !topicsOk; i++ { + time.Sleep(1 * time.Second) + md, err := controller.GetMetadata(&MetadataRequest{ + Topics: testTopicNames, + }) + if err != nil { + return fmt.Errorf("failed to get metadata for test topics: %w", err) + } + + topicsOk = true + for _, topicsMd := range md.Topics { + if !isTopicNotExistsErrorOrOk(topicsMd.Err) { + topicsOk = false + } } } + if !topicsOk { + return fmt.Errorf("timed out waiting for test topics to be gone") + } + + // now create the topics empty + createRes, err := controller.CreateTopics(&CreateTopicsRequest{ + TopicDetails: testTopicDetails, + Timeout: 30 * time.Second, + }) + if err != nil { + return fmt.Errorf("failed to create test topics: %w", err) + } + for topic, topicErr := range createRes.TopicErrors { + if !isTopicExistsErrorOrOk(topicErr.Err) { + return fmt.Errorf("failed to create test topic %s: %w", topic, topicErr) + } + } + + // This is kind of gross, but we don't actually have support for doing transactional publishing + // with sarama, so we need to use a java-based tool to publish uncomitted messages to + // the uncommitted-topic-test-4 topic + jarName := filepath.Base(uncomittedMsgJar) + if _, err := os.Stat(jarName); err != nil { + Logger.Printf("Downloading %s\n", uncomittedMsgJar) + req, err := http.NewRequest("GET", uncomittedMsgJar, nil) + if err != nil { + return fmt.Errorf("failed creating requst for uncomitted msg jar: %w", err) + } + res, err := http.DefaultClient.Do(req) + if err != nil { + return fmt.Errorf("failed fetching the uncommitted msg jar: %w", err) + } + defer res.Body.Close() + jarFile, err := os.OpenFile(jarName, os.O_WRONLY | os.O_TRUNC | os.O_CREATE, 0644) + if err != nil { + return fmt.Errorf("failed opening the uncomitted msg jar: %w", err) + } + defer jarFile.Close() + + _, err = io.Copy(jarFile, res.Body) + if err != nil { + return fmt.Errorf("failed writing the uncomitted msg jar: %w", err) + } + } + + c := exec.Command("java", "-jar", jarName, "-b", env.KafkaBrokerAddrs[0], "-c", "4") + c.Stdout = os.Stdout + c.Stderr = os.Stderr + err = c.Run() + if err != nil { + return fmt.Errorf("failed running uncomitted msg jar: %w", err) + } + return nil +} + +func isTopicNotExistsErrorOrOk(err KError) bool { + return err == ErrUnknownTopicOrPartition || err == ErrInvalidTopic || err == ErrNoError +} + +func isTopicExistsErrorOrOk(err KError) bool { + return err == ErrTopicAlreadyExists || err == ErrNoError } func checkKafkaVersion(t testing.TB, requiredVersion string) { - kafkaVersion := os.Getenv("KAFKA_VERSION") + kafkaVersion := FunctionalTestEnv.KafkaVersion if kafkaVersion == "" { - t.Logf("No KAFKA_VERSION set. This test requires Kafka version %s or higher. Continuing...", requiredVersion) + t.Skipf("No KAFKA_VERSION set. This test requires Kafka version %s or higher. Continuing...", requiredVersion) } else { available := parseKafkaVersion(kafkaVersion) required := parseKafkaVersion(requiredVersion) @@ -84,30 +383,19 @@ func checkKafkaVersion(t testing.TB, requiredVersion string) { } func resetProxies(t testing.TB) { - if err := proxyClient.ResetState(); err != nil { + if err := FunctionalTestEnv.ToxiproxyClient.ResetState(); err != nil { t.Error(err) } - Proxies = nil -} - -func fetchProxies(t testing.TB) { - var err error - Proxies, err = proxyClient.Proxies() - if err != nil { - t.Fatal(err) - } } func SaveProxy(t *testing.T, px string) { - if err := Proxies[px].Save(); err != nil { + if err := FunctionalTestEnv.Proxies[px].Save(); err != nil { t.Fatal(err) } } func setupFunctionalTest(t testing.TB) { - checkKafkaAvailability(t) resetProxies(t) - fetchProxies(t) } func teardownFunctionalTest(t testing.TB) { From 75ff946b5267463981395104c6988cb6b3a764d7 Mon Sep 17 00:00:00 2001 From: KJ Tsanaktsidis Date: Sun, 10 May 2020 16:02:36 +1000 Subject: [PATCH 02/16] Map confluent platform -> kafka version --- Makefile | 4 ++++ docker-compose.yml | 16 ++++++++-------- functional_test.go | 21 +++++++++++++++++++++ 3 files changed, 33 insertions(+), 8 deletions(-) diff --git a/Makefile b/Makefile index c3b431a56..18981cec9 100644 --- a/Makefile +++ b/Makefile @@ -25,3 +25,7 @@ lint: test: $(GOTEST) ./... + +.PHONY: test_functional +test_functional: + $(GOTEST) -tags=functional ./... diff --git a/docker-compose.yml b/docker-compose.yml index 03425a6f9..25593fd3b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,7 +1,7 @@ version: '3.7' services: zookeeper-1: - image: 'confluentinc/cp-zookeeper:5.5.0' + image: 'confluentinc/cp-zookeeper:${CONFLUENT_PLATFORM_VERSION:-5.5.0}' restart: always environment: ZOOKEEPER_SERVER_ID: '1' @@ -13,7 +13,7 @@ services: ZOOKEEPER_SYNC_LIMIT: '5' ZOOKEEPER_MAX_CLIENT_CONNS: '0' zookeeper-2: - image: 'confluentinc/cp-zookeeper:5.5.0' + image: 'confluentinc/cp-zookeeper:${CONFLUENT_PLATFORM_VERSION:-5.5.0}' restart: always environment: ZOOKEEPER_SERVER_ID: '2' @@ -25,7 +25,7 @@ services: ZOOKEEPER_SYNC_LIMIT: '5' ZOOKEEPER_MAX_CLIENT_CONNS: '0' zookeeper-3: - image: 'confluentinc/cp-zookeeper:5.5.0' + image: 'confluentinc/cp-zookeeper:${CONFLUENT_PLATFORM_VERSION:-5.5.0}' restart: always environment: ZOOKEEPER_SERVER_ID: '3' @@ -37,7 +37,7 @@ services: ZOOKEEPER_SYNC_LIMIT: '5' ZOOKEEPER_MAX_CLIENT_CONNS: '0' kafka-1: - image: 'confluentinc/cp-kafka:5.5.0' + image: 'confluentinc/cp-kafka:${CONFLUENT_PLATFORM_VERSION:-5.5.0}' restart: always environment: KAFKA_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181' @@ -54,7 +54,7 @@ services: KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false' kafka-2: - image: 'confluentinc/cp-kafka:5.5.0' + image: 'confluentinc/cp-kafka:${CONFLUENT_PLATFORM_VERSION:-5.5.0}' restart: always environment: KAFKA_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181' @@ -71,7 +71,7 @@ services: KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false' kafka-3: - image: 'confluentinc/cp-kafka:5.5.0' + image: 'confluentinc/cp-kafka:${CONFLUENT_PLATFORM_VERSION:-5.5.0}' restart: always environment: KAFKA_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181' @@ -88,7 +88,7 @@ services: KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false' kafka-4: - image: 'confluentinc/cp-kafka:5.5.0' + image: 'confluentinc/cp-kafka:${CONFLUENT_PLATFORM_VERSION:-5.5.0}' restart: always environment: KAFKA_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181' @@ -105,7 +105,7 @@ services: KAFKA_DELETE_TOPIC_ENABLE: 'true' KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false' kafka-5: - image: 'confluentinc/cp-kafka:5.5.0' + image: 'confluentinc/cp-kafka:${CONFLUENT_PLATFORM_VERSION:-5.5.0}' restart: always environment: KAFKA_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181' diff --git a/functional_test.go b/functional_test.go index e56182be0..0e0c9e216 100644 --- a/functional_test.go +++ b/functional_test.go @@ -107,9 +107,30 @@ func prepareDockerTestEnvironment(ctx context.Context, env *testEnvironment) err return fmt.Errorf("failed to tear down existing env: %w", err) } + if version, ok := os.LookupEnv("KAFKA_VERSION"); ok { + env.KafkaVersion = version + } else { + // We have cp-5.5.0 as the default in the docker-compose file, so that's kafka 2.5.0. + env.KafkaVersion = "2.5.0" + } + + // the mapping of confluent platform docker image versions -> kafka versions can be + // found here: https://docs.confluent.io/current/installation/versions-interoperability.html + var confluentPlatformVersion string + switch env.KafkaVersion { + case "2.5.0": + confluentPlatformVersion = "5.5.0" + case "2.4.1": + confluentPlatformVersion = "5.4.2" + default: + return fmt.Errorf("don't know what confluent platform version to use for kafka %s", env.KafkaVersion) + } + + c := exec.Command("docker-compose", "up", "-d") c.Stdout = os.Stdout c.Stderr = os.Stderr + c.Env = append(os.Environ(), fmt.Sprintf("CONFLUENT_PLATFORM_VERSION=%s", confluentPlatformVersion)) err := c.Run() if err != nil { return fmt.Errorf("failed to run docker-compose to start test enviroment: %w", err) From ddb342c2a4143262cfdde12885f86ea4fbf08847 Mon Sep 17 00:00:00 2001 From: KJ Tsanaktsidis Date: Sun, 10 May 2020 16:11:14 +1000 Subject: [PATCH 03/16] Use the built-in docker compose support to run CI tests --- .github/workflows/ci.yml | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 116deb5e5..fca6cb201 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -14,13 +14,8 @@ jobs: platform: [ubuntu-latest] env: - KAFKA_PEERS: localhost:9091,localhost:9092,localhost:9093,localhost:9094,localhost:9095 - TOXIPROXY_ADDR: http://localhost:8474 - KAFKA_INSTALL_ROOT: /home/runner/kafka - KAFKA_HOSTNAME: localhost DEBUG: true KAFKA_VERSION: ${{ matrix.kafka-version }} - KAFKA_SCALA_VERSION: 2.12 steps: - uses: actions/checkout@v1 @@ -48,16 +43,9 @@ jobs: run: | curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.23.6 export REPOSITORY_ROOT=${GITHUB_WORKSPACE} - vagrant/install_cluster.sh - vagrant/boot_cluster.sh - vagrant/create_topics.sh - vagrant/run_java_producer.sh - name: Run test suite - run: make test + run: make test_functional - name: Run linter run: make lint - - - name: Teardown - run: vagrant/halt_cluster.sh From 42bbd066c0c6758fb18cd612e360f9fa0581308f Mon Sep 17 00:00:00 2001 From: KJ Tsanaktsidis Date: Sun, 10 May 2020 16:13:20 +1000 Subject: [PATCH 04/16] Delete vagrant test harness setup --- vagrant/create_topics.sh | 9 ---- vagrant/halt_cluster.sh | 25 ----------- vagrant/install_cluster.sh | 86 ------------------------------------ vagrant/kafka.conf | 9 ---- vagrant/provision.sh | 17 ------- vagrant/run_java_producer.sh | 6 --- vagrant/run_toxiproxy.sh | 22 --------- vagrant/setup_services.sh | 29 ------------ vagrant/toxiproxy.conf | 6 --- vagrant/zookeeper.conf | 7 --- vagrant/zookeeper.properties | 36 --------------- 11 files changed, 252 deletions(-) delete mode 100755 vagrant/create_topics.sh delete mode 100755 vagrant/halt_cluster.sh delete mode 100755 vagrant/install_cluster.sh delete mode 100644 vagrant/kafka.conf delete mode 100755 vagrant/provision.sh delete mode 100755 vagrant/run_java_producer.sh delete mode 100755 vagrant/run_toxiproxy.sh delete mode 100755 vagrant/setup_services.sh delete mode 100644 vagrant/toxiproxy.conf delete mode 100644 vagrant/zookeeper.conf delete mode 100644 vagrant/zookeeper.properties diff --git a/vagrant/create_topics.sh b/vagrant/create_topics.sh deleted file mode 100755 index 959d3a053..000000000 --- a/vagrant/create_topics.sh +++ /dev/null @@ -1,9 +0,0 @@ -#!/bin/sh - -set -ex - -cd ${KAFKA_INSTALL_ROOT}/kafka-9092 -bin/kafka-topics.sh --create --partitions 1 --replication-factor 3 --topic test.1 --zookeeper localhost:2181 -bin/kafka-topics.sh --create --partitions 4 --replication-factor 3 --topic test.4 --zookeeper localhost:2181 -bin/kafka-topics.sh --create --partitions 64 --replication-factor 3 --topic test.64 --zookeeper localhost:2181 -bin/kafka-topics.sh --create --partitions 1 --replication-factor 3 --topic uncommitted-topic-test-4 --zookeeper localhost:2181 \ No newline at end of file diff --git a/vagrant/halt_cluster.sh b/vagrant/halt_cluster.sh deleted file mode 100755 index e671c4812..000000000 --- a/vagrant/halt_cluster.sh +++ /dev/null @@ -1,25 +0,0 @@ -#!/bin/bash - -# If the functional tests failed (or some other task) then -# we might want to look into the broker logs -if [ "$TRAVIS_TEST_RESULT" = "1" ]; then - echo "Dumping Kafka brokers server.log:" - for i in 1 2 3 4 5; do - KAFKA_PORT=`expr $i + 9090` - sed -e "s/^/kafka-${KAFKA_PORT} /" ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT}/logs/server.log{.*,} - done -fi - -set -ex - -for i in 1 2 3 4 5; do - KAFKA_PORT=`expr $i + 9090` - cd ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT} && bin/kafka-server-stop.sh -done - -for i in 1 2 3 4 5; do - KAFKA_PORT=`expr $i + 9090` - cd ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT} && bin/zookeeper-server-stop.sh -done - -killall toxiproxy diff --git a/vagrant/install_cluster.sh b/vagrant/install_cluster.sh deleted file mode 100755 index aa22261e4..000000000 --- a/vagrant/install_cluster.sh +++ /dev/null @@ -1,86 +0,0 @@ -#!/bin/sh - -set -ex - -TOXIPROXY_VERSION=2.1.4 - -mkdir -p ${KAFKA_INSTALL_ROOT} -if [ ! -f ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_VERSION}.tgz ]; then - wget --quiet https://archive.apache.org/dist/kafka/${KAFKA_VERSION}/kafka_${KAFKA_SCALA_VERSION}-${KAFKA_VERSION}.tgz -O ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_VERSION}.tgz -fi -if [ ! -f ${KAFKA_INSTALL_ROOT}/toxiproxy-${TOXIPROXY_VERSION} ]; then - wget --quiet https://github.com/Shopify/toxiproxy/releases/download/v${TOXIPROXY_VERSION}/toxiproxy-server-linux-amd64 -O ${KAFKA_INSTALL_ROOT}/toxiproxy-${TOXIPROXY_VERSION} - chmod +x ${KAFKA_INSTALL_ROOT}/toxiproxy-${TOXIPROXY_VERSION} -fi -rm -f ${KAFKA_INSTALL_ROOT}/toxiproxy -ln -s ${KAFKA_INSTALL_ROOT}/toxiproxy-${TOXIPROXY_VERSION} ${KAFKA_INSTALL_ROOT}/toxiproxy - -for i in 1 2 3 4 5; do - ZK_PORT=$((i + 2180)) - ZK_PORT_REAL=$((i + 21800)) - KAFKA_PORT=$((i + 9090)) - KAFKA_PORT_REAL=$((i + 29090)) - - # unpack kafka - mkdir -p ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT} - tar xzf ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_VERSION}.tgz -C ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT} --strip-components 1 - - # broker configuration - mkdir -p "${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT}/data" - - # Append to default server.properties with a small number of customisations - printf "\n\n" >> "${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT}/config/server.properties" - cat << EOF >> "${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT}/config/server.properties" -############################# Sarama Test Cluster ############################# - -broker.id=${KAFKA_PORT} -broker.rack=${i} - -# Listen on "real" port -listeners=PLAINTEXT://:${KAFKA_PORT_REAL} -# Advertise Toxiproxy port -advertised.listeners=PLAINTEXT://${KAFKA_HOSTNAME}:${KAFKA_PORT} - -# Connect to Zookeeper via Toxiproxy port -zookeeper.connect=127.0.0.1:${ZK_PORT} - -# Data directory -log.dirs="${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT}/data" - -# Create new topics with a replication factor of 2 so failover can be tested -# more easily. -default.replication.factor=2 - -# Turn on log.retention.bytes to avoid filling up the VM's disk -log.retention.bytes=268435456 -log.segment.bytes=268435456 - -# Enable topic deletion and disable auto-creation -delete.topic.enable=true -auto.create.topics.enable=false - -# Lower the zookeeper timeouts so we don't have to wait forever for a node -# to die when we use toxiproxy to kill its zookeeper connection -zookeeper.session.timeout.ms=3000 -zookeeper.connection.timeout.ms=3000 - -# Disable broker ID length constraint -reserved.broker.max.id=10000 - -# Permit follower fetching (KIP-392) -replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector - -############################################################################### -EOF - - # zookeeper configuration - cp ${REPOSITORY_ROOT}/vagrant/zookeeper.properties ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT}/config/ - sed -i s/KAFKAID/${KAFKA_PORT}/g ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT}/config/zookeeper.properties - sed -i s/ZK_PORT/${ZK_PORT_REAL}/g ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT}/config/zookeeper.properties - - ZK_DATADIR="${KAFKA_INSTALL_ROOT}/zookeeper-${ZK_PORT}" - mkdir -p ${ZK_DATADIR} - sed -i s#ZK_DATADIR#${ZK_DATADIR}#g ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT}/config/zookeeper.properties - - echo $i > ${KAFKA_INSTALL_ROOT}/zookeeper-${ZK_PORT}/myid -done diff --git a/vagrant/kafka.conf b/vagrant/kafka.conf deleted file mode 100644 index 25101df5a..000000000 --- a/vagrant/kafka.conf +++ /dev/null @@ -1,9 +0,0 @@ -start on started zookeeper-ZK_PORT -stop on stopping zookeeper-ZK_PORT - -# Use a script instead of exec (using env stanza leaks KAFKA_HEAP_OPTS from zookeeper) -script - sleep 2 - export KAFKA_HEAP_OPTS="-Xmx320m" - exec /opt/kafka-KAFKAID/bin/kafka-server-start.sh /opt/kafka-KAFKAID/config/server.properties -end script diff --git a/vagrant/provision.sh b/vagrant/provision.sh deleted file mode 100755 index 7f10de74a..000000000 --- a/vagrant/provision.sh +++ /dev/null @@ -1,17 +0,0 @@ -#!/bin/sh - -set -ex - -apt-get update -yes | apt-get install default-jre - -export KAFKA_INSTALL_ROOT=/opt -export KAFKA_HOSTNAME=192.168.100.67 -export KAFKA_VERSION=1.0.2 -export KAFKA_SCALA_VERSION=2.11 -export REPOSITORY_ROOT=/vagrant - -sh /vagrant/vagrant/install_cluster.sh -sh /vagrant/vagrant/setup_services.sh -sh /vagrant/vagrant/create_topics.sh -sh /vagrant/vagrant/run_java_producer.sh diff --git a/vagrant/run_java_producer.sh b/vagrant/run_java_producer.sh deleted file mode 100755 index 5851b7484..000000000 --- a/vagrant/run_java_producer.sh +++ /dev/null @@ -1,6 +0,0 @@ -#!/bin/sh - -set -ex - -wget https://github.com/FrancoisPoinsot/simplest-uncommitted-msg/releases/download/0.1/simplest-uncommitted-msg-0.1-jar-with-dependencies.jar -java -jar simplest-uncommitted-msg-0.1-jar-with-dependencies.jar -b ${KAFKA_HOSTNAME}:9092 -c 4 \ No newline at end of file diff --git a/vagrant/run_toxiproxy.sh b/vagrant/run_toxiproxy.sh deleted file mode 100755 index e52c00e7b..000000000 --- a/vagrant/run_toxiproxy.sh +++ /dev/null @@ -1,22 +0,0 @@ -#!/bin/sh - -set -ex - -${KAFKA_INSTALL_ROOT}/toxiproxy -port 8474 -host 0.0.0.0 & -PID=$! - -while ! nc -q 1 localhost 8474 Date: Sun, 10 May 2020 17:41:42 +1000 Subject: [PATCH 05/16] Run the linter across functional tests as well --- Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 18981cec9..a05863480 100644 --- a/Makefile +++ b/Makefile @@ -21,7 +21,7 @@ fmt: gofmt -s -l -w $(FILES) $(TESTS) lint: - golangci-lint run + GOFLAGS="-tags=functional" golangci-lint run test: $(GOTEST) ./... From a9e0b4f16371e7c1c065fcc91b0fc081157361d2 Mon Sep 17 00:00:00 2001 From: KJ Tsanaktsidis Date: Sun, 10 May 2020 18:55:21 +1000 Subject: [PATCH 06/16] Make linter pass on functional tests --- functional_test.go | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/functional_test.go b/functional_test.go index 0e0c9e216..2391725a4 100644 --- a/functional_test.go +++ b/functional_test.go @@ -5,7 +5,6 @@ package sarama import ( "context" "fmt" - toxiproxy "github.com/Shopify/toxiproxy/client" "io" "log" "net" @@ -18,6 +17,8 @@ import ( "strings" "testing" "time" + + toxiproxy "github.com/Shopify/toxiproxy/client" ) const ( @@ -27,19 +28,19 @@ const ( var ( testTopicDetails = map[string]*TopicDetail{ "test.1": { - NumPartitions: 1, + NumPartitions: 1, ReplicationFactor: 3, }, "test.4": { - NumPartitions: 4, + NumPartitions: 4, ReplicationFactor: 3, }, "test.64": { - NumPartitions: 64, + NumPartitions: 64, ReplicationFactor: 3, }, "uncommitted-topic-test-4": { - NumPartitions: 1, + NumPartitions: 1, ReplicationFactor: 3, }, } @@ -80,10 +81,10 @@ func testMain(m *testing.M) int { if !usingExisting { err := prepareDockerTestEnvironment(ctx, &env) if err != nil { - tearDownDockerTestEnvironment(ctx, &env) + _ = tearDownDockerTestEnvironment(ctx, &env) panic(err) } - defer tearDownDockerTestEnvironment(ctx, &env) + defer tearDownDockerTestEnvironment(ctx, &env) // nolint:errcheck } if err := prepareTestTopics(ctx, &env); err != nil { panic(err) @@ -126,7 +127,6 @@ func prepareDockerTestEnvironment(ctx context.Context, env *testEnvironment) err return fmt.Errorf("don't know what confluent platform version to use for kafka %s", env.KafkaVersion) } - c := exec.Command("docker-compose", "up", "-d") c.Stdout = os.Stdout c.Stderr = os.Stderr @@ -143,14 +143,14 @@ func prepareDockerTestEnvironment(ctx context.Context, env *testEnvironment) err proxyName := fmt.Sprintf("kafka%d", i) proxy, err := env.ToxiproxyClient.CreateProxy( proxyName, - fmt.Sprintf("0.0.0.0:%d", 29090 + i), - fmt.Sprintf("kafka-%d:%d", i, 29090 + i), + fmt.Sprintf("0.0.0.0:%d", 29090+i), + fmt.Sprintf("kafka-%d:%d", i, 29090+i), ) if err != nil { return fmt.Errorf("failed to create toxiproxy: %w", err) } env.Proxies[proxyName] = proxy - env.KafkaBrokerAddrs = append(env.KafkaBrokerAddrs, fmt.Sprintf("127.0.0.1:%d", 29090 + i)) + env.KafkaBrokerAddrs = append(env.KafkaBrokerAddrs, fmt.Sprintf("127.0.0.1:%d", 29090+i)) } // the mapping of confluent platform docker image vesions -> kafka versions can be @@ -175,7 +175,7 @@ func prepareDockerTestEnvironment(ctx context.Context, env *testEnvironment) err brokersOk := make([]bool, len(env.KafkaBrokerAddrs)) retryLoop: for j, addr := range env.KafkaBrokerAddrs { - client, err := NewClient([]string{addr},config) + client, err := NewClient([]string{addr}, config) if err != nil { continue } @@ -267,7 +267,7 @@ func tearDownDockerTestEnvironment(ctx context.Context, env *testEnvironment) er func prepareTestTopics(ctx context.Context, env *testEnvironment) error { Logger.Println("creating test topics") var testTopicNames []string - for topic, _ := range testTopicDetails { + for topic := range testTopicDetails { testTopicNames = append(testTopicNames, topic) } @@ -296,7 +296,7 @@ func prepareTestTopics(ctx context.Context, env *testEnvironment) error { // Start by deleting the test topics (if they already exist) deleteRes, err := controller.DeleteTopics(&DeleteTopicsRequest{ - Topics: testTopicNames, + Topics: testTopicNames, Timeout: 30 * time.Second, }) if err != nil { @@ -334,7 +334,7 @@ func prepareTestTopics(ctx context.Context, env *testEnvironment) error { // now create the topics empty createRes, err := controller.CreateTopics(&CreateTopicsRequest{ TopicDetails: testTopicDetails, - Timeout: 30 * time.Second, + Timeout: 30 * time.Second, }) if err != nil { return fmt.Errorf("failed to create test topics: %w", err) @@ -360,7 +360,7 @@ func prepareTestTopics(ctx context.Context, env *testEnvironment) error { return fmt.Errorf("failed fetching the uncommitted msg jar: %w", err) } defer res.Body.Close() - jarFile, err := os.OpenFile(jarName, os.O_WRONLY | os.O_TRUNC | os.O_CREATE, 0644) + jarFile, err := os.OpenFile(jarName, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644) if err != nil { return fmt.Errorf("failed opening the uncomitted msg jar: %w", err) } From bb1b6ac12b3fe0aa63c937dd56dec1585e85bab1 Mon Sep 17 00:00:00 2001 From: Diego Alvarez Date: Wed, 17 Jun 2020 16:05:31 -0700 Subject: [PATCH 07/16] include zstd on the functional tests --- functional_producer_test.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/functional_producer_test.go b/functional_producer_test.go index e589a8eb8..b6bac585b 100644 --- a/functional_producer_test.go +++ b/functional_producer_test.go @@ -31,6 +31,12 @@ func TestFuncProducingSnappy(t *testing.T) { testProducingMessages(t, config) } +func TestFuncProducingZstd(t *testing.T) { + config := NewConfig() + config.Producer.Compression = CompressionZSTD + testProducingMessages(t, config) +} + func TestFuncProducingNoResponse(t *testing.T) { config := NewConfig() config.Producer.RequiredAcks = NoResponse From 0af1b4d7562155751a841806bf07e9397afe6a23 Mon Sep 17 00:00:00 2001 From: Diego Alvarez Date: Thu, 18 Jun 2020 09:36:40 -0700 Subject: [PATCH 08/16] zstd needs client >= 2.1.0.0 --- functional_producer_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/functional_producer_test.go b/functional_producer_test.go index b6bac585b..9e153b0d9 100644 --- a/functional_producer_test.go +++ b/functional_producer_test.go @@ -33,6 +33,7 @@ func TestFuncProducingSnappy(t *testing.T) { func TestFuncProducingZstd(t *testing.T) { config := NewConfig() + config.Version = V2_1_0_0 config.Producer.Compression = CompressionZSTD testProducingMessages(t, config) } From cfd5999fa195c5c7ae785e533166b06947bae7ce Mon Sep 17 00:00:00 2001 From: Konstantinos Tsanaktsidis Date: Wed, 24 Jun 2020 13:51:24 +1000 Subject: [PATCH 09/16] Remove redundant hard-coding of env.KafkaVersion --- functional_test.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/functional_test.go b/functional_test.go index 2391725a4..b61a43daf 100644 --- a/functional_test.go +++ b/functional_test.go @@ -153,11 +153,6 @@ func prepareDockerTestEnvironment(ctx context.Context, env *testEnvironment) err env.KafkaBrokerAddrs = append(env.KafkaBrokerAddrs, fmt.Sprintf("127.0.0.1:%d", 29090+i)) } - // the mapping of confluent platform docker image vesions -> kafka versions can be - // found here: https://docs.confluent.io/current/installation/versions-interoperability.html - // We have cp-5.5.0 in the docker-compose file, so that's kafka 2.5.0. - env.KafkaVersion = "2.5.0" - // Wait for the kafka broker to come up allBrokersUp := false for i := 0; i < 45 && !allBrokersUp; i++ { From cb9d1e8b85290f42c25bc72d120397ccfec3500a Mon Sep 17 00:00:00 2001 From: Wim Claeys Date: Wed, 24 Jun 2020 09:04:25 +0100 Subject: [PATCH 10/16] Add support for manual commit to ConsumerGroup - expose a `Commit()` sync method on ConsumerGroupSession - don't create mainLoop in OffsetManager unless AutoCommit is enabled --- consumer_group.go | 9 +++++++ offset_manager.go | 27 ++++++++++++-------- offset_manager_test.go | 58 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 84 insertions(+), 10 deletions(-) diff --git a/consumer_group.go b/consumer_group.go index 056b9e387..aae6599ca 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -513,6 +513,11 @@ type ConsumerGroupSession interface { // message twice, and your processing should ideally be idempotent. MarkOffset(topic string, partition int32, offset int64, metadata string) + // Commit the offset to the backend + // + // Note: calling Commit performs a blocking synchronous operation. + Commit() + // ResetOffset resets to the provided offset, alongside a metadata string that // represents the state of the partition consumer at that point in time. Reset // acts as a counterpart to MarkOffset, the difference being that it allows to @@ -624,6 +629,10 @@ func (s *consumerGroupSession) MarkOffset(topic string, partition int32, offset } } +func (s *consumerGroupSession) Commit() { + s.offsets.Commit() +} + func (s *consumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string) { if pom := s.offsets.findPOM(topic, partition); pom != nil { pom.ResetOffset(offset, metadata) diff --git a/offset_manager.go b/offset_manager.go index 19408729f..b4fea8226 100644 --- a/offset_manager.go +++ b/offset_manager.go @@ -19,6 +19,10 @@ type OffsetManager interface { // will otherwise leak memory. You must call this after all the // PartitionOffsetManagers are closed. Close() error + + // Commit commits the offsets. This method can be used if AutoCommit.Enable is + // set to false. + Commit() } type offsetManager struct { @@ -58,7 +62,6 @@ func newOffsetManagerFromClient(group, memberID string, generation int32, client client: client, conf: conf, group: group, - ticker: time.NewTicker(conf.Consumer.Offsets.AutoCommit.Interval), poms: make(map[string]map[int32]*partitionOffsetManager), memberID: memberID, @@ -67,7 +70,10 @@ func newOffsetManagerFromClient(group, memberID string, generation int32, client closing: make(chan none), closed: make(chan none), } - go withRecover(om.mainLoop) + if conf.Consumer.Offsets.AutoCommit.Enable { + om.ticker = time.NewTicker(conf.Consumer.Offsets.AutoCommit.Interval) + go withRecover(om.mainLoop) + } return om, nil } @@ -99,7 +105,9 @@ func (om *offsetManager) Close() error { om.closeOnce.Do(func() { // exit the mainLoop close(om.closing) - <-om.closed + if om.conf.Consumer.Offsets.AutoCommit.Enable { + <-om.closed + } // mark all POMs as closed om.asyncClosePOMs() @@ -225,20 +233,19 @@ func (om *offsetManager) mainLoop() { for { select { case <-om.ticker.C: - om.flushToBroker() - om.releasePOMs(false) + om.Commit() case <-om.closing: return } } } -// flushToBroker is ignored if auto-commit offsets is disabled -func (om *offsetManager) flushToBroker() { - if !om.conf.Consumer.Offsets.AutoCommit.Enable { - return - } +func (om *offsetManager) Commit() { + om.flushToBroker() + om.releasePOMs(false) +} +func (om *offsetManager) flushToBroker() { req := om.constructRequest() if req == nil { return diff --git a/offset_manager_test.go b/offset_manager_test.go index f1baa9cdb..5aa2ee0ff 100644 --- a/offset_manager_test.go +++ b/offset_manager_test.go @@ -169,6 +169,64 @@ func TestNewOffsetManagerOffsetsAutoCommit(t *testing.T) { } } +func TestNewOffsetManagerOffsetsManualCommit(t *testing.T) { + // Tests to validate configuration when `Consumer.Offsets.AutoCommit.Enable` is false + config := NewConfig() + config.Consumer.Offsets.AutoCommit.Enable = false + + om, testClient, broker, coordinator := initOffsetManagerWithBackoffFunc(t, 0, nil, config) + pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta") + + // Wait long enough for the test not to fail.. + timeout := 50 * config.Consumer.Offsets.AutoCommit.Interval + + ocResponse := new(OffsetCommitResponse) + ocResponse.AddError("my_topic", 0, ErrNoError) + called := make(chan none) + handler := func(req *request) (res encoderWithHeader) { + close(called) + return ocResponse + } + coordinator.setHandler(handler) + + // Should not trigger an auto-commit + expected := int64(1) + pom.ResetOffset(expected, "modified_meta") + _, _ = pom.NextOffset() + + select { + case <-called: + // OffsetManager called on the wire. + t.Errorf("Received request when AutoCommit is disabled") + case <-time.After(timeout): + // Timeout waiting for OffsetManager to call on the wire. + // OK + } + + // Setup again to test manual commit + called = make(chan none) + + om.Commit() + + select { + case <-called: + // OffsetManager called on the wire. + // OK + case <-time.After(timeout): + // Timeout waiting for OffsetManager to call on the wire. + t.Errorf("No request received for after waiting for %v", timeout) + } + + // Close up + broker.Close() + coordinator.Close() + + // !! om must be closed before the pom so pom.release() is called before pom.Close() + safeClose(t, om) + safeClose(t, pom) + safeClose(t, testClient) +} + // Test recovery from ErrNotCoordinatorForConsumer // on first fetchInitialOffset call func TestOffsetManagerFetchInitialFail(t *testing.T) { From 5d05ad69f7a867433db72fca64ae3272c29fbcb6 Mon Sep 17 00:00:00 2001 From: Diego Alvarez Date: Thu, 11 Jun 2020 16:36:13 -0700 Subject: [PATCH 11/16] KIP-42 Add producer and consumer interceptors MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR includes: Producer: `onSend` but it doesn't implement `onAcknowledgement` Consumer: `onConsume` but it doesn't implement `onCommit` I'm not sure if I need to add the `onClose` method. Maybe in another iteration ¯\_(ツ)_/¯ --- async_producer.go | 4 ++ async_producer_test.go | 121 ++++++++++++++++++++++++++++++++++++ config.go | 16 +++++ consumer.go | 3 + consumer_test.go | 110 ++++++++++++++++++++++++++++++++ functional_producer_test.go | 73 ++++++++++++++++++++++ interceptors.go | 43 +++++++++++++ 7 files changed, 370 insertions(+) create mode 100644 interceptors.go diff --git a/async_producer.go b/async_producer.go index d0ce01b66..f1ffc8f92 100644 --- a/async_producer.go +++ b/async_producer.go @@ -348,6 +348,10 @@ func (p *asyncProducer) dispatcher() { p.inFlight.Add(1) } + for _, interceptor := range p.conf.Producer.Interceptors { + msg.safelyApplyInterceptor(interceptor) + } + version := 1 if p.conf.Version.IsAtLeast(V0_11_0_0) { version = 2 diff --git a/async_producer_test.go b/async_producer_test.go index 46b97790a..338ca3656 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -5,6 +5,7 @@ import ( "log" "os" "os/signal" + "strconv" "sync" "sync/atomic" "testing" @@ -1230,6 +1231,126 @@ func TestBrokerProducerShutdown(t *testing.T) { mockBroker.Close() } +type appendInterceptor struct { + i int +} + +func (b *appendInterceptor) onSend(msg *ProducerMessage) { + if b.i < 0 { + panic("hey, the interceptor have failed") + } + v, _ := msg.Value.Encode() + msg.Value = StringEncoder(string(v) + strconv.Itoa(b.i)) + b.i++ +} + +func (b *appendInterceptor) onConsume(msg *ConsumerMessage) { + if b.i < 0 { + panic("hey, the interceptor have failed") + } + msg.Value = []byte(string(msg.Value) + strconv.Itoa(b.i)) + b.i++ +} + +func testProducerInterceptor( + t *testing.T, + interceptors []ProducerInterceptor, + expectationFn func(*testing.T, int, *ProducerMessage), +) { + seedBroker := NewMockBroker(t, 1) + leader := NewMockBroker(t, 2) + metadataLeader := new(MetadataResponse) + metadataLeader.AddBroker(leader.Addr(), leader.BrokerID()) + metadataLeader.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, nil, ErrNoError) + seedBroker.Returns(metadataLeader) + + config := NewConfig() + config.Producer.Flush.Messages = 10 + config.Producer.Return.Successes = true + config.Producer.Interceptors = interceptors + producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + for i := 0; i < 10; i++ { + producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)} + } + + prodSuccess := new(ProduceResponse) + prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError) + leader.Returns(prodSuccess) + + for i := 0; i < 10; i++ { + select { + case msg := <-producer.Errors(): + t.Error(msg.Err) + case msg := <-producer.Successes(): + expectationFn(t, i, msg) + } + } + + closeProducer(t, producer) + leader.Close() + seedBroker.Close() +} + +func TestAsyncProducerInterceptors(t *testing.T) { + tests := []struct { + name string + interceptors []ProducerInterceptor + expectationFn func(*testing.T, int, *ProducerMessage) + }{ + { + name: "intercept messages", + interceptors: []ProducerInterceptor{&appendInterceptor{i: 0}}, + expectationFn: func(t *testing.T, i int, msg *ProducerMessage) { + v, _ := msg.Value.Encode() + expected := TestMessage + strconv.Itoa(i) + if string(v) != expected { + t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected) + } + }, + }, + { + name: "interceptor chain", + interceptors: []ProducerInterceptor{&appendInterceptor{i: 0}, &appendInterceptor{i: 1000}}, + expectationFn: func(t *testing.T, i int, msg *ProducerMessage) { + v, _ := msg.Value.Encode() + expected := TestMessage + strconv.Itoa(i) + strconv.Itoa(i+1000) + if string(v) != expected { + t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected) + } + }, + }, + { + name: "interceptor chain with one interceptor failing", + interceptors: []ProducerInterceptor{&appendInterceptor{i: -1}, &appendInterceptor{i: 1000}}, + expectationFn: func(t *testing.T, i int, msg *ProducerMessage) { + v, _ := msg.Value.Encode() + expected := TestMessage + strconv.Itoa(i+1000) + if string(v) != expected { + t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected) + } + }, + }, + { + name: "interceptor chain with all interceptors failing", + interceptors: []ProducerInterceptor{&appendInterceptor{i: -1}, &appendInterceptor{i: -1}}, + expectationFn: func(t *testing.T, i int, msg *ProducerMessage) { + v, _ := msg.Value.Encode() + expected := TestMessage + if string(v) != expected { + t.Errorf("Interceptor should have not changed the value, got %s, expected %s", v, expected) + } + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { testProducerInterceptor(t, tt.interceptors, tt.expectationFn) }) + } +} + // This example shows how to use the producer while simultaneously // reading the Errors channel to know about any failures. func ExampleAsyncProducer_select() { diff --git a/config.go b/config.go index 0ce308f80..b346c58a1 100644 --- a/config.go +++ b/config.go @@ -229,6 +229,14 @@ type Config struct { // `Backoff` if set. BackoffFunc func(retries, maxRetries int) time.Duration } + + // Interceptors to be called when the producer dispatcher reads the + // message for the first time. Interceptors allows to intercept and + // possible mutate the message before they are published to Kafka + // cluster. *ProducerMessage modified by the first interceptor's + // onSend() is passed to the second interceptor onSend(), and so on in + // the interceptor chain. + Interceptors []ProducerInterceptor } // Consumer is the namespace for configuration related to consuming messages, @@ -391,6 +399,14 @@ type Config struct { // - use `ReadUncommitted` (default) to consume and return all messages in message channel // - use `ReadCommitted` to hide messages that are part of an aborted transaction IsolationLevel IsolationLevel + + // Interceptors to be called just before the record is sent to the + // messages channel. Interceptors allows to intercept and possible + // mutate the message before they are returned to the client. + // *ConsumerMessage modified by the first interceptor's onConsume() is + // passed to the second interceptor onConsume(), and so on in the + // interceptor chain. + Interceptors []ConsumerInterceptor } // A user-provided string sent with every request to the brokers for logging, diff --git a/consumer.go b/consumer.go index e16d08aa9..b0cdfc3a7 100644 --- a/consumer.go +++ b/consumer.go @@ -451,6 +451,9 @@ feederLoop: } for i, msg := range msgs { + for _, interceptor := range child.conf.Consumer.Interceptors { + msg.safelyApplyInterceptor(interceptor) + } messageSelect: select { case <-child.dying: diff --git a/consumer_test.go b/consumer_test.go index d0617f2ab..230582e5f 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -5,6 +5,7 @@ import ( "os" "os/signal" "reflect" + "strconv" "sync" "sync/atomic" "testing" @@ -1342,3 +1343,112 @@ func Test_partitionConsumer_parseResponse(t *testing.T) { }) } } + +func testConsumerInterceptor( + t *testing.T, + interceptors []ConsumerInterceptor, + expectationFn func(*testing.T, int, *ConsumerMessage), +) { + // Given + broker0 := NewMockBroker(t, 0) + + mockFetchResponse := NewMockFetchResponse(t, 1) + for i := 0; i < 10; i++ { + mockFetchResponse.SetMessage("my_topic", 0, int64(i), testMsg) + } + + broker0.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetBroker(broker0.Addr(), broker0.BrokerID()). + SetLeader("my_topic", 0, broker0.BrokerID()), + "OffsetRequest": NewMockOffsetResponse(t). + SetOffset("my_topic", 0, OffsetOldest, 0). + SetOffset("my_topic", 0, OffsetNewest, 0), + "FetchRequest": mockFetchResponse, + }) + config := NewConfig() + config.Consumer.Interceptors = interceptors + // When + master, err := NewConsumer([]string{broker0.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + consumer, err := master.ConsumePartition("my_topic", 0, 0) + if err != nil { + t.Fatal(err) + } + + for i := 0; i < 10; i++ { + select { + case msg := <-consumer.Messages(): + expectationFn(t, i, msg) + case err := <-consumer.Errors(): + t.Error(err) + } + } + + safeClose(t, consumer) + safeClose(t, master) + broker0.Close() +} + +func TestConsumerInterceptors(t *testing.T) { + tests := []struct { + name string + interceptors []ConsumerInterceptor + expectationFn func(*testing.T, int, *ConsumerMessage) + }{ + { + name: "intercept messages", + interceptors: []ConsumerInterceptor{&appendInterceptor{i: 0}}, + expectationFn: func(t *testing.T, i int, msg *ConsumerMessage) { + ev, _ := testMsg.Encode() + expected := string(ev) + strconv.Itoa(i) + v := string(msg.Value) + if string(v) != expected { + t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected) + } + }, + }, + { + name: "interceptor chain", + interceptors: []ConsumerInterceptor{&appendInterceptor{i: 0}, &appendInterceptor{i: 1000}}, + expectationFn: func(t *testing.T, i int, msg *ConsumerMessage) { + ev, _ := testMsg.Encode() + expected := string(ev) + strconv.Itoa(i) + strconv.Itoa(i+1000) + v := string(msg.Value) + if string(v) != expected { + t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected) + } + }, + }, + { + name: "interceptor chain with one interceptor failing", + interceptors: []ConsumerInterceptor{&appendInterceptor{i: -1}, &appendInterceptor{i: 1000}}, + expectationFn: func(t *testing.T, i int, msg *ConsumerMessage) { + ev, _ := testMsg.Encode() + expected := string(ev) + strconv.Itoa(i+1000) + v := string(msg.Value) + if string(v) != expected { + t.Errorf("Interceptor should have not changed the value, got %s, expected %s", v, expected) + } + }, + }, + { + name: "interceptor chain with all interceptors failing", + interceptors: []ConsumerInterceptor{&appendInterceptor{i: -1}, &appendInterceptor{i: -1}}, + expectationFn: func(t *testing.T, i int, msg *ConsumerMessage) { + ev, _ := testMsg.Encode() + expected := string(ev) + v := string(msg.Value) + if string(v) != expected { + t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected) + } + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { testConsumerInterceptor(t, tt.interceptors, tt.expectationFn) }) + } +} diff --git a/functional_producer_test.go b/functional_producer_test.go index a4ab5a1b3..f6cc349e0 100644 --- a/functional_producer_test.go +++ b/functional_producer_test.go @@ -5,6 +5,7 @@ package sarama import ( "fmt" "os" + "strconv" "strings" "sync" "testing" @@ -183,6 +184,78 @@ func TestFuncProducingIdempotentWithBrokerFailure(t *testing.T) { } } +func TestInterceptors(t *testing.T) { + config := NewConfig() + setupFunctionalTest(t) + defer teardownFunctionalTest(t) + + config.Producer.Return.Successes = true + config.Consumer.Return.Errors = true + config.Producer.Interceptors = []ProducerInterceptor{&appendInterceptor{i: 0}, &appendInterceptor{i: 100}} + config.Consumer.Interceptors = []ConsumerInterceptor{&appendInterceptor{i: 20}} + + client, err := NewClient(kafkaBrokers, config) + if err != nil { + t.Fatal(err) + } + + initialOffset, err := client.GetOffset("test.1", 0, OffsetNewest) + if err != nil { + t.Fatal(err) + } + + producer, err := NewAsyncProducerFromClient(client) + if err != nil { + t.Fatal(err) + } + + for i := 0; i < 10; i++ { + producer.Input() <- &ProducerMessage{Topic: "test.1", Key: nil, Value: StringEncoder(TestMessage)} + } + + for i := 0; i < 10; i++ { + select { + case msg := <-producer.Errors(): + t.Error(msg.Err) + case msg := <-producer.Successes(): + v, _ := msg.Value.Encode() + expected := TestMessage + strconv.Itoa(i) + strconv.Itoa(i+100) + if string(v) != expected { + t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected) + } + } + } + safeClose(t, producer) + + master, err := NewConsumerFromClient(client) + if err != nil { + t.Fatal(err) + } + consumer, err := master.ConsumePartition("test.1", 0, initialOffset) + if err != nil { + t.Fatal(err) + } + + for i := 0; i < 10; i++ { + select { + case <-time.After(10 * time.Second): + t.Fatal("Not received any more events in the last 10 seconds.") + case err := <-consumer.Errors(): + t.Error(err) + case msg := <-consumer.Messages(): + // producer interceptors: strconv.Itoa(i) + strconv.Itoa(i+100) + // consumer interceptor: strconv.Itoa(i+20) + expected := TestMessage + strconv.Itoa(i) + strconv.Itoa(i+100) + strconv.Itoa(i+20) + v := string(msg.Value) + if string(v) != expected { + t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected) + } + } + } + safeClose(t, consumer) + safeClose(t, client) +} + func testProducingMessages(t *testing.T, config *Config) { setupFunctionalTest(t) defer teardownFunctionalTest(t) diff --git a/interceptors.go b/interceptors.go new file mode 100644 index 000000000..c6eaeda9d --- /dev/null +++ b/interceptors.go @@ -0,0 +1,43 @@ +package sarama + +// ProducerInterceptor allows you to intercept (and possibly mutate) the records +// received by the producer before they are published to the Kafka cluster. +// https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors#KIP42:AddProducerandConsumerInterceptors-Motivation +type ProducerInterceptor interface { + + // onSend is called when the producer message is intercepted. Please avoid + // modifying the message until it's safe to do so, as this is _not_ a copy + // of the message. + onSend(*ProducerMessage) +} + +// ConsumerInterceptor allows you to intercept (and possibly mutate) the records +// received by the consumer before they are sent to the messages channel. +// https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors#KIP42:AddProducerandConsumerInterceptors-Motivation +type ConsumerInterceptor interface { + + // onConsume is called when the consumed message is intercepted. Please + // avoid modifying the message until it's safe to do so, as this is _not_ a + // copy of the message. + onConsume(*ConsumerMessage) +} + +func (msg *ProducerMessage) safelyApplyInterceptor(interceptor ProducerInterceptor) { + defer func() { + if r := recover(); r != nil { + Logger.Printf("Error when calling producer interceptor: %s, %w\n", interceptor, r) + } + }() + + interceptor.onSend(msg) +} + +func (msg *ConsumerMessage) safelyApplyInterceptor(interceptor ConsumerInterceptor) { + defer func() { + if r := recover(); r != nil { + Logger.Printf("Error when calling consumer interceptor: %s, %w\n", interceptor, r) + } + }() + + interceptor.onConsume(msg) +} From aff81dac91bb27b4c4ab92333a7a1ab83360e7c3 Mon Sep 17 00:00:00 2001 From: Diego Alvarez Date: Mon, 22 Jun 2020 12:44:43 -0700 Subject: [PATCH 12/16] make linter happy --- consumer_test.go | 8 ++++---- functional_producer_test.go | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/consumer_test.go b/consumer_test.go index 230582e5f..2416b2c83 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -1406,7 +1406,7 @@ func TestConsumerInterceptors(t *testing.T) { ev, _ := testMsg.Encode() expected := string(ev) + strconv.Itoa(i) v := string(msg.Value) - if string(v) != expected { + if v != expected { t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected) } }, @@ -1418,7 +1418,7 @@ func TestConsumerInterceptors(t *testing.T) { ev, _ := testMsg.Encode() expected := string(ev) + strconv.Itoa(i) + strconv.Itoa(i+1000) v := string(msg.Value) - if string(v) != expected { + if v != expected { t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected) } }, @@ -1430,7 +1430,7 @@ func TestConsumerInterceptors(t *testing.T) { ev, _ := testMsg.Encode() expected := string(ev) + strconv.Itoa(i+1000) v := string(msg.Value) - if string(v) != expected { + if v != expected { t.Errorf("Interceptor should have not changed the value, got %s, expected %s", v, expected) } }, @@ -1442,7 +1442,7 @@ func TestConsumerInterceptors(t *testing.T) { ev, _ := testMsg.Encode() expected := string(ev) v := string(msg.Value) - if string(v) != expected { + if v != expected { t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected) } }, diff --git a/functional_producer_test.go b/functional_producer_test.go index f6cc349e0..36387a438 100644 --- a/functional_producer_test.go +++ b/functional_producer_test.go @@ -247,7 +247,7 @@ func TestInterceptors(t *testing.T) { // consumer interceptor: strconv.Itoa(i+20) expected := TestMessage + strconv.Itoa(i) + strconv.Itoa(i+100) + strconv.Itoa(i+20) v := string(msg.Value) - if string(v) != expected { + if v != expected { t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected) } } From 2beebdf4e03833f0db9e579850c76ae8b705991f Mon Sep 17 00:00:00 2001 From: Diego Alvarez Date: Mon, 22 Jun 2020 14:41:39 -0700 Subject: [PATCH 13/16] remove code comments --- functional_producer_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/functional_producer_test.go b/functional_producer_test.go index 36387a438..249e7fd97 100644 --- a/functional_producer_test.go +++ b/functional_producer_test.go @@ -243,9 +243,9 @@ func TestInterceptors(t *testing.T) { case err := <-consumer.Errors(): t.Error(err) case msg := <-consumer.Messages(): - // producer interceptors: strconv.Itoa(i) + strconv.Itoa(i+100) - // consumer interceptor: strconv.Itoa(i+20) - expected := TestMessage + strconv.Itoa(i) + strconv.Itoa(i+100) + strconv.Itoa(i+20) + prodInteExpectation := strconv.Itoa(i) + strconv.Itoa(i+100) + consInteExpectation := strconv.Itoa(i + 20) + expected := TestMessage + prodInteExpectation + consInteExpectation v := string(msg.Value) if v != expected { t.Errorf("Interceptor should have incremented the value, got %s, expected %s", v, expected) From 76f08b854897ac33305081e65319c33759988e57 Mon Sep 17 00:00:00 2001 From: Diego Alvarez Date: Tue, 23 Jun 2020 09:47:24 -0700 Subject: [PATCH 14/16] use the third person singular present tense s/have/has --- async_producer_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/async_producer_test.go b/async_producer_test.go index 338ca3656..7b266a971 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -1237,7 +1237,7 @@ type appendInterceptor struct { func (b *appendInterceptor) onSend(msg *ProducerMessage) { if b.i < 0 { - panic("hey, the interceptor have failed") + panic("hey, the interceptor has failed") } v, _ := msg.Value.Encode() msg.Value = StringEncoder(string(v) + strconv.Itoa(b.i)) @@ -1246,7 +1246,7 @@ func (b *appendInterceptor) onSend(msg *ProducerMessage) { func (b *appendInterceptor) onConsume(msg *ConsumerMessage) { if b.i < 0 { - panic("hey, the interceptor have failed") + panic("hey, the interceptor has failed") } msg.Value = []byte(string(msg.Value) + strconv.Itoa(b.i)) b.i++ From a29994860e1bf533be55f877ad9fd7b78e9b5b6d Mon Sep 17 00:00:00 2001 From: Diego Alvarez Date: Wed, 24 Jun 2020 09:30:00 -0700 Subject: [PATCH 15/16] export interceptor methods --- async_producer_test.go | 4 ++-- config.go | 6 +++--- interceptors.go | 12 ++++++------ 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/async_producer_test.go b/async_producer_test.go index 7b266a971..0c6ddb696 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -1235,7 +1235,7 @@ type appendInterceptor struct { i int } -func (b *appendInterceptor) onSend(msg *ProducerMessage) { +func (b *appendInterceptor) OnSend(msg *ProducerMessage) { if b.i < 0 { panic("hey, the interceptor has failed") } @@ -1244,7 +1244,7 @@ func (b *appendInterceptor) onSend(msg *ProducerMessage) { b.i++ } -func (b *appendInterceptor) onConsume(msg *ConsumerMessage) { +func (b *appendInterceptor) OnConsume(msg *ConsumerMessage) { if b.i < 0 { panic("hey, the interceptor has failed") } diff --git a/config.go b/config.go index b346c58a1..9b7ce7aeb 100644 --- a/config.go +++ b/config.go @@ -234,7 +234,7 @@ type Config struct { // message for the first time. Interceptors allows to intercept and // possible mutate the message before they are published to Kafka // cluster. *ProducerMessage modified by the first interceptor's - // onSend() is passed to the second interceptor onSend(), and so on in + // OnSend() is passed to the second interceptor OnSend(), and so on in // the interceptor chain. Interceptors []ProducerInterceptor } @@ -403,8 +403,8 @@ type Config struct { // Interceptors to be called just before the record is sent to the // messages channel. Interceptors allows to intercept and possible // mutate the message before they are returned to the client. - // *ConsumerMessage modified by the first interceptor's onConsume() is - // passed to the second interceptor onConsume(), and so on in the + // *ConsumerMessage modified by the first interceptor's OnConsume() is + // passed to the second interceptor OnConsume(), and so on in the // interceptor chain. Interceptors []ConsumerInterceptor } diff --git a/interceptors.go b/interceptors.go index c6eaeda9d..d0d33e526 100644 --- a/interceptors.go +++ b/interceptors.go @@ -5,10 +5,10 @@ package sarama // https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors#KIP42:AddProducerandConsumerInterceptors-Motivation type ProducerInterceptor interface { - // onSend is called when the producer message is intercepted. Please avoid + // OnSend is called when the producer message is intercepted. Please avoid // modifying the message until it's safe to do so, as this is _not_ a copy // of the message. - onSend(*ProducerMessage) + OnSend(*ProducerMessage) } // ConsumerInterceptor allows you to intercept (and possibly mutate) the records @@ -16,10 +16,10 @@ type ProducerInterceptor interface { // https://cwiki.apache.org/confluence/display/KAFKA/KIP-42%3A+Add+Producer+and+Consumer+Interceptors#KIP42:AddProducerandConsumerInterceptors-Motivation type ConsumerInterceptor interface { - // onConsume is called when the consumed message is intercepted. Please + // OnConsume is called when the consumed message is intercepted. Please // avoid modifying the message until it's safe to do so, as this is _not_ a // copy of the message. - onConsume(*ConsumerMessage) + OnConsume(*ConsumerMessage) } func (msg *ProducerMessage) safelyApplyInterceptor(interceptor ProducerInterceptor) { @@ -29,7 +29,7 @@ func (msg *ProducerMessage) safelyApplyInterceptor(interceptor ProducerIntercept } }() - interceptor.onSend(msg) + interceptor.OnSend(msg) } func (msg *ConsumerMessage) safelyApplyInterceptor(interceptor ConsumerInterceptor) { @@ -39,5 +39,5 @@ func (msg *ConsumerMessage) safelyApplyInterceptor(interceptor ConsumerIntercept } }() - interceptor.onConsume(msg) + interceptor.OnConsume(msg) } From 899e856c5e00758b175a72590637fbb69bd9c18f Mon Sep 17 00:00:00 2001 From: Diego Alvarez Date: Mon, 29 Jun 2020 09:15:54 -0700 Subject: [PATCH 16/16] fix renamed variable after merging latest master --- functional_producer_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/functional_producer_test.go b/functional_producer_test.go index 249e7fd97..b83dc72e3 100644 --- a/functional_producer_test.go +++ b/functional_producer_test.go @@ -194,7 +194,7 @@ func TestInterceptors(t *testing.T) { config.Producer.Interceptors = []ProducerInterceptor{&appendInterceptor{i: 0}, &appendInterceptor{i: 100}} config.Consumer.Interceptors = []ConsumerInterceptor{&appendInterceptor{i: 20}} - client, err := NewClient(kafkaBrokers, config) + client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config) if err != nil { t.Fatal(err) }