Skip to content

Commit

Permalink
add timeout for admin requests
Browse files Browse the repository at this point in the history
  • Loading branch information
d-rk committed Dec 8, 2020
1 parent 8309d18 commit 8be68c2
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 0 deletions.
1 change: 1 addition & 0 deletions .github/workflows/lint_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ jobs:
env:
CP_VERSION: ${{ matrix.cp_version }}
KAFKAVERSION: ${{ matrix.kafka_version }}
TIMEOUT: 30
steps:
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
Note that kafka >= 2.4.0.0 is required, otherwise the relevant api calls are not available.
- Added command `alter partition` which currently only enables to manually assign broker replicas to a partition.
Note that kafka >= 2.4.0.0 is required, otherwise the relevant api calls are not available.
- Added `timeout` config to control timeout of admin requests.

## 1.13.3 - 2020-11-11

Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ contexts:
# optional: kafkaVersion (defaults to 2.0.0)
kafkaVersion: 1.1.1

# optional: timeout for admin requests (defaults to 3s)
timeout: 10

# optional: avro schema registry
avro:
schemaRegistry: localhost:8081
Expand Down
1 change: 1 addition & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ var cfgFile string
var Verbose bool

var envMapping = map[string]string{
"TIMEOUT": "CONTEXTS_DEFAULT_TIMEOUT",
"BROKERS": "CONTEXTS_DEFAULT_BROKERS",
"TLS_ENABLED": "CONTEXTS_DEFAULT_TLS_ENABLED",
"TLS_CA": "CONTEXTS_DEFAULT_TLS_CA",
Expand Down
2 changes: 2 additions & 0 deletions cmd/root_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func TestEnvironmentVariableLoadingAliases(t *testing.T) {

test_util.StartUnitTest(t)

_ = os.Setenv("TIMEOUT", "30")
_ = os.Setenv("BROKERS", "broker1:9092 broker2:9092")
_ = os.Setenv("TLS_ENABLED", "true")
_ = os.Setenv("TLS_CA", "my-ca")
Expand All @@ -65,6 +66,7 @@ func TestEnvironmentVariableLoadingAliases(t *testing.T) {

test_util.AssertEquals(t, "broker1:9092", viper.GetStringSlice("contexts.default.brokers")[0])
test_util.AssertEquals(t, "broker2:9092", viper.GetStringSlice("contexts.default.brokers")[1])
test_util.AssertEquals(t, "30", viper.GetString("contexts.default.timeout"))
test_util.AssertEquals(t, "true", viper.GetString("contexts.default.tls.enabled"))
test_util.AssertEquals(t, "my-ca", viper.GetString("contexts.default.tls.ca"))
test_util.AssertEquals(t, "my-cert", viper.GetString("contexts.default.tls.cert"))
Expand Down
9 changes: 9 additions & 0 deletions operations/common-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"os/user"
"regexp"
"strings"
"time"
)

var (
Expand Down Expand Up @@ -45,6 +46,7 @@ type ClientContext struct {
Tls TlsConfig
Sasl SaslConfig
Kubernetes K8sConfig
Timeout time.Duration
ClientID string
KafkaVersion sarama.KafkaVersion
AvroSchemaRegistry string
Expand Down Expand Up @@ -75,6 +77,8 @@ func CreateClientContext() (ClientContext, error) {
context.Tls.Insecure = viper.GetBool("contexts." + context.Name + ".tls.insecure")
context.ClientID = viper.GetString("contexts." + context.Name + ".clientID")

context.Timeout = viper.GetDuration("contexts." + context.Name + ".timeout")

if version, err := kafkaVersion(viper.GetString("contexts." + context.Name + ".kafkaVersion")); err == nil {
context.KafkaVersion = version
} else {
Expand Down Expand Up @@ -111,6 +115,11 @@ func CreateClientConfig(context *ClientContext) (*sarama.Config, error) {
config.Version = context.KafkaVersion
config.ClientID = getClientID(context)

if context.Timeout > 0 {
output.Debugf("using timeout: %d", context.Timeout)
config.Admin.Timeout = context.Timeout
}

if context.Tls.Enabled {
output.Debugf("TLS is enabled.")
config.Net.TLS.Enable = true
Expand Down

0 comments on commit 8be68c2

Please sign in to comment.