Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add replication-factor flag to alter topic #37

Merged
merged 10 commits into from
Dec 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions .github/workflows/lint_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,21 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
# cp-version 5.5.x corresponds to kafka version 2.5.x
cp-version: [5.5.0, 5.4.2, 5.3.3]
# https://docs.confluent.io/platform/current/installation/versions-interoperability.html
kafka_version: [2.6.0, 2.5.0, 2.4.0, 2.3.0]
include:
- kafka_version: 2.6.0
cp_version: 6.0.1
- kafka_version: 2.5.0
cp_version: 5.5.2
- kafka_version: 2.4.0
cp_version: 5.4.3
- kafka_version: 2.3.0
cp_version: 5.3.3
env:
CP_VERSION: ${{ matrix.cp-version }}
CP_VERSION: ${{ matrix.cp_version }}
KAFKAVERSION: ${{ matrix.kafka_version }}
REQUESTTIMEOUT: 15s
steps:
- uses: actions/checkout@v2
- uses: actions/setup-go@v2
Expand Down
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added
- Add parameter `--replication-factor` to `alter topic` command which allows changing the replication factor of a topic.
Note that kafka >= 2.4.0.0 is required, otherwise the relevant api calls are not available.
- Added command `alter partition` which currently only enables to manually assign broker replicas to a partition.
Note that kafka >= 2.4.0.0 is required, otherwise the relevant api calls are not available.
- Added `requestTimeout` config to control timeout of admin requests.

## 1.13.3 - 2020-11-11

### Fixed
Expand Down
24 changes: 23 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ contexts:
# optional: kafkaVersion (defaults to 2.0.0)
kafkaVersion: 1.1.1

# optional: timeout for admin requests (defaults to 3s)
requestTimeout: 10s

# optional: avro schema registry
avro:
schemaRegistry: localhost:8081
Expand Down Expand Up @@ -404,18 +407,37 @@ An additional parameter `print-schema` can be provided to display the schema use

### Altering topics

Using the `alter topic` command allows you to change the partition count and topic-level configurations of an existing topic.
Using the `alter topic` command allows you to change the partition count, replication factor and topic-level
configurations of an existing topic.

The partition count can be increased with:
```bash
kafkactl alter topic my-topic --partitions 32
```

The replication factor can be altered with:
```bash
kafkactl alter topic my-topic --replication-factor 2
```

> :information_source: when altering replication factor, kafkactl tries to keep the number of replicas assigned to each
> broker balanced. If you need more control over the assigned replicas use `alter partition` directly.

The topic configs can be edited by supplying key value pairs as follows:
```bash
kafkactl alter topic my-topic --config retention.ms=3600 --config cleanup.policy=compact
```

> :bulb: use the flag `--validate-only` to perform a dry-run without actually modifying the topic

### Altering partitions

The assigned replicas of a partition can directly be altered with:
```bash
# set brokers 102,103 as replicas for partition 3 of topic my-topic
kafkactl alter topic my-topic 3 -r 102,103
```

### Consumer groups

In order to get a list of consumer groups the `get consumer-groups` command can be used:
Expand Down
59 changes: 59 additions & 0 deletions cmd/alter/alter-partition.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package alter

import (
"github.com/deviceinsight/kafkactl/cmd/validation"
"github.com/deviceinsight/kafkactl/operations"
"github.com/deviceinsight/kafkactl/operations/k8s"
"github.com/deviceinsight/kafkactl/operations/partitions"
"github.com/deviceinsight/kafkactl/output"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"strconv"
)

func newAlterPartitionCmd() *cobra.Command {

var flags partitions.AlterPartitionFlags

var cmdAlterPartition = &cobra.Command{
Use: "partition TOPIC PARTITION",
Short: "alter a partition",
Args: cobra.ExactArgs(2),
Run: func(cmd *cobra.Command, args []string) {
if !(&k8s.K8sOperation{}).TryRun(cmd, args) {

var partition int32

if i, err := strconv.ParseInt(args[1], 10, 64); err != nil {
output.Fail(errors.Errorf("argument 2 needs to be a partition %s", args[1]))
} else {
partition = int32(i)
}

if err := (&partitions.PartitionOperation{}).AlterPartition(args[0], partition, flags); err != nil {
output.Fail(err)
}
}
},
PreRunE: func(cmd *cobra.Command, args []string) error {
return validation.ValidateAtLeastOneRequiredFlag(cmd)
},
ValidArgsFunction: func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
if len(args) == 0 {
return operations.CompleteTopicNames(cmd, args, toComplete)
} else if len(args) == 1 {
return partitions.CompletePartitionIds(cmd, args, toComplete)
} else {
return nil, cobra.ShellCompDirectiveNoFileComp
}
},
}

cmdAlterPartition.Flags().Int32SliceVarP(&flags.Replicas, "replicas", "r", nil, "set replicas for a partition")
cmdAlterPartition.Flags().BoolVarP(&flags.ValidateOnly, "validate-only", "v", false, "validate only")

if err := validation.MarkFlagAtLeastOneRequired(cmdAlterPartition.Flags(), "replicas"); err != nil {
panic(err)
}
return cmdAlterPartition
}
99 changes: 99 additions & 0 deletions cmd/alter/alter-partition_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package alter_test

import (
"github.com/Rican7/retry"
"github.com/Rican7/retry/backoff"
"github.com/Rican7/retry/strategy"
"github.com/deviceinsight/kafkactl/operations"
"github.com/deviceinsight/kafkactl/test_util"
"gopkg.in/errgo.v2/fmt/errors"
"strings"
"testing"
"time"
)

func TestAlterPartitionAutoCompletionIntegration(t *testing.T) {

test_util.StartIntegrationTest(t)

prefix := "alter-p-complete-"

topicName1 := test_util.CreateTopic(t, prefix+"a", "--partitions", "2")
topicName2 := test_util.CreateTopic(t, prefix+"b")
topicName3 := test_util.CreateTopic(t, prefix+"c")

kafkaCtl := test_util.CreateKafkaCtlCommand()
kafkaCtl.Verbose = false

if _, err := kafkaCtl.Execute("__complete", "alter", "partition", ""); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

outputLines := strings.Split(strings.TrimSpace(kafkaCtl.GetStdOut()), "\n")

test_util.AssertContains(t, topicName1, outputLines)
test_util.AssertContains(t, topicName2, outputLines)
test_util.AssertContains(t, topicName3, outputLines)

if _, err := kafkaCtl.Execute("__complete", "alter", "partition", topicName1, ""); err != nil {
t.Fatalf("failed to execute command: %v", err)
}

outputLines = strings.Split(strings.TrimSpace(kafkaCtl.GetStdOut()), "\n")

test_util.AssertContains(t, "0", outputLines)
test_util.AssertContains(t, "1", outputLines)
}

func TestAlterPartitionReplicationFactorIntegration(t *testing.T) {

test_util.StartIntegrationTest(t)

prefix := "alter-p-replicas-"

topicName := test_util.CreateTopic(t, prefix, "--partitions", "2", "--replication-factor", "3")

kafkaCtl := test_util.CreateKafkaCtlCommand()

if _, err := kafkaCtl.Execute("alter", "partition", topicName, "0", "--replicas", "101,102"); err != nil {
test_util.AssertErrorContains(t, "version of API is not supported", err)
return
}

checkReplicas := func(attempt uint) error {
_, err := kafkaCtl.Execute("describe", "topic", topicName, "-o", "yaml")

if err != nil {
return err
} else {
topic, err := operations.TopicFromYaml(kafkaCtl.GetStdOut())
if err != nil {
return err
}

if len(topic.Partitions) != 2 {
return errors.Newf("expected 2 partitions, but was %d", len(topic.Partitions))
}

if len(topic.Partitions[0].Replicas) == 2 && len(topic.Partitions[1].Replicas) == 3 {
if topic.Partitions[0].Replicas[0] == 101 && topic.Partitions[0].Replicas[1] == 102 {
return nil
} else {
return errors.Newf("different brokers expected %v", topic.Partitions[0].Replicas)
}
} else {
return errors.Newf("replica count incorrect %v", topic.Partitions)
}
}
}

err := retry.Retry(
checkReplicas,
strategy.Limit(5),
strategy.Backoff(backoff.Linear(10*time.Millisecond)),
)

if err != nil {
t.Fatalf("could not check Replicas for topic %s: %v", topicName, err)
}
}
4 changes: 4 additions & 0 deletions cmd/alter/alter-topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func newAlterTopicCmd() *cobra.Command {
}

cmdAlterTopic.Flags().Int32VarP(&flags.Partitions, "partitions", "p", flags.Partitions, "number of partitions")
cmdAlterTopic.Flags().Int16VarP(&flags.ReplicationFactor, "replication-factor", "r", flags.ReplicationFactor, "replication factor")
cmdAlterTopic.Flags().StringArrayVarP(&flags.Configs, "config", "c", flags.Configs, "configs in format `key=value`")
cmdAlterTopic.Flags().BoolVarP(&flags.ValidateOnly, "validate-only", "v", false, "validate only")

Expand All @@ -39,6 +40,9 @@ func newAlterTopicCmd() *cobra.Command {
if err := validation.MarkFlagAtLeastOneRequired(cmdAlterTopic.Flags(), "config"); err != nil {
panic(err)
}
if err := validation.MarkFlagAtLeastOneRequired(cmdAlterTopic.Flags(), "replication-factor"); err != nil {
panic(err)
}

return cmdAlterTopic
}
Loading