Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(test): make it simpler to re-use toxiproxy #2122

Merged
merged 1 commit into from
Jan 31, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 29 additions & 33 deletions functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"fmt"
"io"
"log"
"net"
"net/http"
"net/url"
"os"
Expand Down Expand Up @@ -101,6 +100,31 @@ type testEnvironment struct {
KafkaVersion string
}

// setupToxiProxies will configure the toxiproxy proxies with routes for the
// kafka brokers if they don't already exist
func setupToxiProxies(env *testEnvironment, endpoint string) error {
env.ToxiproxyClient = toxiproxy.NewClient(endpoint)
env.Proxies = map[string]*toxiproxy.Proxy{}
env.KafkaBrokerAddrs = nil
for i := 1; i <= 5; i++ {
proxyName := fmt.Sprintf("kafka%d", i)
proxy, err := env.ToxiproxyClient.Proxy(proxyName)
if err != nil {
proxy, err = env.ToxiproxyClient.CreateProxy(
proxyName,
fmt.Sprintf("0.0.0.0:%d", 29090+i),
fmt.Sprintf("kafka-%d:%d", i, 29090+i),
)
if err != nil {
return fmt.Errorf("failed to create toxiproxy: %w", err)
}
}
env.Proxies[proxyName] = proxy
env.KafkaBrokerAddrs = append(env.KafkaBrokerAddrs, fmt.Sprintf("127.0.0.1:%d", 29090+i))
}
return nil
}

func prepareDockerTestEnvironment(ctx context.Context, env *testEnvironment) error {
Logger.Println("bringing up docker-based test environment")

Expand Down Expand Up @@ -139,21 +163,8 @@ func prepareDockerTestEnvironment(ctx context.Context, env *testEnvironment) err
return fmt.Errorf("failed to run docker-compose to start test environment: %w", err)
}

// Set up toxiproxy Proxies
env.ToxiproxyClient = toxiproxy.NewClient("localhost:8474")
env.Proxies = map[string]*toxiproxy.Proxy{}
for i := 1; i <= 5; i++ {
proxyName := fmt.Sprintf("kafka%d", i)
proxy, err := env.ToxiproxyClient.CreateProxy(
proxyName,
fmt.Sprintf("0.0.0.0:%d", 29090+i),
fmt.Sprintf("kafka-%d:%d", i, 29090+i),
)
if err != nil {
return fmt.Errorf("failed to create toxiproxy: %w", err)
}
env.Proxies[proxyName] = proxy
env.KafkaBrokerAddrs = append(env.KafkaBrokerAddrs, fmt.Sprintf("127.0.0.1:%d", 29090+i))
if err := setupToxiProxies(env, "http://localhost:8474"); err != nil {
return fmt.Errorf("failed to setup toxiproxies: %w", err)
}

// Wait for the kafka broker to come up
Expand Down Expand Up @@ -218,23 +229,8 @@ func existingEnvironment(ctx context.Context, env *testEnvironment) (bool, error
if err != nil {
return false, fmt.Errorf("$TOXIPROXY_ADDR not parseable as url")
}
toxiproxyHost := toxiproxyURL.Hostname()

env.ToxiproxyClient = toxiproxy.NewClient(toxiproxyAddr)
env.Proxies = map[string]*toxiproxy.Proxy{}
for i := 1; i <= 5; i++ {
proxyName := fmt.Sprintf("kafka%d", i)
proxy, err := env.ToxiproxyClient.Proxy(proxyName)
if err != nil {
return false, fmt.Errorf("no proxy kafka%d on toxiproxy: %w", i, err)
}
env.Proxies[proxyName] = proxy
// get the host:port from the proxy & toxiproxy addr, so we can do "$toxiproxy_addr:$proxy_port"
_, proxyPort, err := net.SplitHostPort(proxy.Listen)
if err != nil {
return false, fmt.Errorf("proxy.Listen not a host:port combo: %w", err)
}
env.KafkaBrokerAddrs = append(env.KafkaBrokerAddrs, net.JoinHostPort(toxiproxyHost, proxyPort))
if err := setupToxiProxies(env, toxiproxyURL.String()); err != nil {
return false, fmt.Errorf("failed to setup toxiproxies: %w", err)
}

env.KafkaVersion, ok = os.LookupEnv("KAFKA_VERSION")
Expand Down