Skip to content

Commit

Permalink
fix issues after rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
d-rk committed Nov 27, 2020
1 parent 339edf1 commit fbe2698
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 46 deletions.
58 changes: 33 additions & 25 deletions cmd/alter/alter-partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,47 @@ package alter

import (
"github.com/deviceinsight/kafkactl/cmd/validation"
"github.com/deviceinsight/kafkactl/operations/k8s"
"github.com/deviceinsight/kafkactl/operations/partitions"
"github.com/deviceinsight/kafkactl/output"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"strconv"
)

var partitionFlags partitions.AlterPartitionFlags

var cmdAlterPartition = &cobra.Command{
Use: "partition TOPIC PARTITION",
Short: "alter a topic",
Args: cobra.ExactArgs(2),
Run: func(cmd *cobra.Command, args []string) {

var partition int32

if i, err := strconv.ParseInt(args[1], 10, 64); err != nil {
output.Failf("argument 2 needs to be a partition %s", args[1])
} else {
partition = int32(i)
}

(&partitions.PartitionOperation{}).AlterPartition(args[0], partition, partitionFlags)
},
PreRunE: func(cmd *cobra.Command, args []string) error {
return validation.ValidateAtLeastOneRequiredFlag(cmd)
},
}
func newAlterPartitionCmd() *cobra.Command {

var flags partitions.AlterPartitionFlags

var cmdAlterPartition = &cobra.Command{
Use: "partition TOPIC PARTITION",
Short: "alter a topic",
Args: cobra.ExactArgs(2),
Run: func(cmd *cobra.Command, args []string) {
if !(&k8s.K8sOperation{}).TryRun(cmd, args) {

var partition int32

if i, err := strconv.ParseInt(args[1], 10, 64); err != nil {
output.Fail(errors.Errorf("argument 2 needs to be a partition %s", args[1]))
} else {
partition = int32(i)
}

if err := (&partitions.PartitionOperation{}).AlterPartition(args[0], partition, flags); err != nil {
output.Fail(err)
}
}
},
PreRunE: func(cmd *cobra.Command, args []string) error {
return validation.ValidateAtLeastOneRequiredFlag(cmd)
},
}

func init() {
cmdAlterPartition.Flags().Int32SliceVarP(&partitionFlags.Replicas, "replicas", "r", nil, "set replicas for a partition")
cmdAlterPartition.Flags().Int32SliceVarP(&flags.Replicas, "replicas", "r", nil, "set replicas for a partition")

if err := validation.MarkFlagAtLeastOneRequired(cmdAlterPartition.Flags(), "replicas"); err != nil {
output.Failf("internal error: %v", err)
panic(err)
}
return cmdAlterPartition
}
2 changes: 1 addition & 1 deletion cmd/alter/alter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
func NewAlterCmd() *cobra.Command {

var cmdAlter = &cobra.Command{
Use: "alter",
Use: "alter",
Aliases: []string{"edit"},
Short: "alter topics, partitions",
}
Expand Down
29 changes: 17 additions & 12 deletions operations/partitions/partition-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/Shopify/sarama"
"github.com/deviceinsight/kafkactl/operations"
"github.com/deviceinsight/kafkactl/output"
"github.com/pkg/errors"
)

type AlterPartitionFlags struct {
Expand All @@ -13,31 +14,34 @@ type AlterPartitionFlags struct {
type PartitionOperation struct {
}

func (operation *PartitionOperation) AlterPartition(topic string, partition int32, flags AlterPartitionFlags) {

context := operations.CreateClientContext()
func (operation *PartitionOperation) AlterPartition(topic string, partition int32, flags AlterPartitionFlags) error {

var (
client sarama.Client
admin sarama.ClusterAdmin
err error
exists bool
context operations.ClientContext
client sarama.Client
admin sarama.ClusterAdmin
err error
exists bool
)

if context, err = operations.CreateClientContext(); err != nil {
return err
}

if client, err = operations.CreateClient(&context); err != nil {
output.Failf("failed to create client err=%v", err)
return err
}

if exists, err = operations.TopicExists(&client, topic); err != nil {
output.Failf("failed to read topics err=%v", err)
return err
}

if !exists {
output.Failf("topic '%s' does not exist", topic)
return errors.Errorf("topic '%s' does not exist", topic)
}

if admin, err = operations.CreateClusterAdmin(&context); err != nil {
output.Failf("failed to create cluster admin: %v", err)
return err
}

if len(flags.Replicas) > 0 {
Expand All @@ -52,7 +56,7 @@ func (operation *PartitionOperation) AlterPartition(topic string, partition int3
}

if !brokerIdFound {
output.Failf("unknown broker id to be used as replica: %d", replica)
return errors.Errorf("unknown broker id to be used as replica: %d", replica)
}
}

Expand All @@ -61,4 +65,5 @@ func (operation *PartitionOperation) AlterPartition(topic string, partition int3
}

output.Infof(" hello replicas %v", flags.Replicas)
return nil
}
18 changes: 10 additions & 8 deletions operations/topic-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@ import (
"fmt"
"github.com/Shopify/sarama"
"github.com/deviceinsight/kafkactl/output"
"github.com/deviceinsight/kafkactl/util"
"github.com/pkg/errors"
"github.com/spf13/cobra"
"math/rand"
"github.com/deviceinsight/kafkactl/util"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -269,7 +268,7 @@ func (operation *TopicOperation) AlterTopic(topic string, flags AlterTopicFlags)
var brokers = client.Brokers()

if int(flags.ReplicationFactor) > len(brokers) {
output.Failf("Replication factor for topic '%s' must not exceed the number of available brokers", topic)
return errors.Errorf("Replication factor for topic '%s' must not exceed the number of available brokers", topic)
}

t, _ = readTopic(&client, &admin, topic, requestedTopicFields{partitionId: true, partitionReplicas: true})
Expand All @@ -289,13 +288,16 @@ func (operation *TopicOperation) AlterTopic(topic string, flags AlterTopicFlags)

for _, partition := range t.Partitions {

var replicas = getTargetReplicas(partition.Replicas, brokerReplicaCount, flags.ReplicationFactor)
var replicas, err = getTargetReplicas(partition.Replicas, brokerReplicaCount, flags.ReplicationFactor)
if err != nil {
return errors.Wrap(err, "unable to determine target replicas")
}
replicaAssignment = append(replicaAssignment, replicas)
}

err = admin.AlterPartitionReassignments(topic, replicaAssignment)
if err != nil {
output.Failf("Could not create partitions for topic '%s': %v", topic, err)
return errors.Errorf("Could not create partitions for topic '%s': %v", topic, err)
}
}

Expand Down Expand Up @@ -352,7 +354,7 @@ func (operation *TopicOperation) ListTopicsNames() ([]string, error) {
}
}

func getTargetReplicas(currentReplicas []int32, brokerReplicaCount map[int32]int, targetReplicationFactor int16) []int32 {
func getTargetReplicas(currentReplicas []int32, brokerReplicaCount map[int32]int, targetReplicationFactor int16) ([]int32, error) {

replicas := currentReplicas

Expand All @@ -378,7 +380,7 @@ func getTargetReplicas(currentReplicas []int32, brokerReplicaCount map[int32]int
}
}
if len(unusedBrokerIds) < (int(targetReplicationFactor) - len(replicas)) {
output.Failf("not enough brokers")
return nil, errors.New("not enough brokers")
}
}

Expand All @@ -394,7 +396,7 @@ func getTargetReplicas(currentReplicas []int32, brokerReplicaCount map[int32]int
brokerReplicaCount[unusedBrokerIds[0]] += 1
}

return replicas
return replicas, nil
}

func (operation *TopicOperation) GetTopics(flags GetTopicsFlags) error {
Expand Down

0 comments on commit fbe2698

Please sign in to comment.