Skip to content

Commit

Permalink
only print when validating alter commands (otherwise result might be …
Browse files Browse the repository at this point in the history
…incorrect)
  • Loading branch information
d-rk committed Dec 8, 2020
1 parent 170aa48 commit 4023e31
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 15 deletions.
2 changes: 1 addition & 1 deletion docker/.env
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
CP_VERSION=6.0.1
WAIT_FOR_KAFKA_TIMEOUT=240
WAIT_FOR_KAFKA_TIMEOUT=480
14 changes: 7 additions & 7 deletions operations/partitions/partition-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,22 +111,22 @@ func (operation *PartitionOperation) AlterPartition(topic string, partitionId in

if statusTopic, ok := status[topic]; ok {
if statusPartition, ok := statusTopic[partitionId]; ok {
output.Debugf("Reassignment running: %s:%d replicas: %v addingReplicas: %v removingReplicas: %v",
output.Infof("reassignment running for topic=%s partition=%d: replicas:%v addingReplicas:%v removingReplicas:%v",
topic, partitionId, statusPartition.Replicas, statusPartition.AddingReplicas, statusPartition.RemovingReplicas)
time.Sleep(2 * time.Second)
time.Sleep(5 * time.Second)
assignmentRunning = true
}
}
}
output.Infof("partition replicas have been reassigned")
}
}

if !flags.ValidateOnly {
if partition, err = readPartition(&client, topic, partitionId); err != nil {
return err
}
if flags.ValidateOnly {
return printPartition(partition)
} else {
return nil
}
return printPartition(partition)
}

func printPartition(p partition) error {
Expand Down
20 changes: 13 additions & 7 deletions operations/topic-operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,8 @@ func (operation *TopicOperation) AlterTopic(topic string, flags AlterTopicFlags)
err = admin.CreatePartitions(topic, flags.Partitions, emptyAssignment, flags.ValidateOnly)
if err != nil {
return errors.Errorf("Could not create partitions for topic '%s': %v", topic, err)
} else {
output.Infof("partitions have been created")
}
}
}
Expand Down Expand Up @@ -338,15 +340,17 @@ func (operation *TopicOperation) AlterTopic(topic string, flags AlterTopicFlags)

if statusTopic, ok := status[topic]; ok {
for partitionId, statusPartition := range statusTopic {
output.Debugf("Reassignment running: %s:%d replicas: %v addingReplicas: %v removingReplicas: %v",
output.Infof("reassignment running for topic=%s partition=%d: replicas:%v addingReplicas:%v removingReplicas:%v",
topic, partitionId, statusPartition.Replicas, statusPartition.AddingReplicas, statusPartition.RemovingReplicas)
time.Sleep(2 * time.Second)
time.Sleep(5 * time.Second)
assignmentRunning = true
}
} else {
output.Debugf("Emtpy list partition reassignment result returned (len status: %d)", len(status))
}
}

output.Infof("partition replicas have been reassigned")
}
}

Expand Down Expand Up @@ -378,16 +382,18 @@ func (operation *TopicOperation) AlterTopic(topic string, flags AlterTopicFlags)
} else {
if err = admin.AlterConfig(sarama.TopicResource, topic, mergedConfigEntries, flags.ValidateOnly); err != nil {
return errors.Errorf("Could not alter topic config '%s': %v", topic, err)
} else {
output.Infof("config has been altered")
}
}
}

if !flags.ValidateOnly {
t, _ = readTopic(&client, &admin, topic, allFields)
if flags.ValidateOnly {
describeFlags := DescribeTopicFlags{PrintConfigs: len(flags.Configs) > 0}
return operation.printTopic(t, describeFlags)
} else {
return nil
}

describeFlags := DescribeTopicFlags{PrintConfigs: len(flags.Configs) > 0}
return operation.printTopic(t, describeFlags)
}

func (operation *TopicOperation) ListTopicsNames() ([]string, error) {
Expand Down

0 comments on commit 4023e31

Please sign in to comment.