Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixed variable names that are named same as some std lib package names #1738

Merged
merged 3 commits into from
Jul 15, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,6 @@ profile.out

simplest-uncommitted-msg-0.1-jar-with-dependencies.jar

.idea
varun06 marked this conversation as resolved.
Show resolved Hide resolved


varun06 marked this conversation as resolved.
Show resolved Hide resolved
16 changes: 8 additions & 8 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -802,7 +802,7 @@ func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err e
// Query brokers in parallel, since we have to query *all* brokers
brokers := ca.client.Brokers()
groupMaps := make(chan map[string]string, len(brokers))
errors := make(chan error, len(brokers))
errChan := make(chan error, len(brokers))
wg := sync.WaitGroup{}

for _, b := range brokers {
Expand All @@ -813,7 +813,7 @@ func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err e

response, err := b.ListGroups(&ListGroupsRequest{})
if err != nil {
errors <- err
errChan <- err
return
}

Expand All @@ -828,7 +828,7 @@ func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err e

wg.Wait()
close(groupMaps)
close(errors)
close(errChan)

for groupMap := range groupMaps {
for group, protocolType := range groupMap {
Expand All @@ -837,7 +837,7 @@ func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err e
}

// Intentionally return only the first error for simplicity
err = <-errors
err = <-errChan
return
}

Expand Down Expand Up @@ -893,7 +893,7 @@ func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32

// Query brokers in parallel, since we may have to query multiple brokers
logDirsMaps := make(chan map[int32][]DescribeLogDirsResponseDirMetadata, len(brokerIds))
errors := make(chan error, len(brokerIds))
errChan := make(chan error, len(brokerIds))
wg := sync.WaitGroup{}

for _, b := range brokerIds {
Expand All @@ -909,7 +909,7 @@ func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32

response, err := b.DescribeLogDirs(&DescribeLogDirsRequest{})
if err != nil {
errors <- err
errChan <- err
return
}
logDirs := make(map[int32][]DescribeLogDirsResponseDirMetadata)
Expand All @@ -920,7 +920,7 @@ func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32

wg.Wait()
close(logDirsMaps)
close(errors)
close(errChan)

for logDirsMap := range logDirsMaps {
for id, logDirs := range logDirsMap {
Expand All @@ -929,6 +929,6 @@ func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32
}

// Intentionally return only the first error for simplicity
err = <-errors
err = <-errChan
return
}
14 changes: 7 additions & 7 deletions async_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1248,21 +1248,21 @@ func ExampleAsyncProducer_select() {
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)

var enqueued, errors int
var enqueued, producerError int
varun06 marked this conversation as resolved.
Show resolved Hide resolved
ProducerLoop:
for {
select {
case producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("testing 123")}:
enqueued++
case err := <-producer.Errors():
log.Println("Failed to produce message", err)
errors++
producerError++
case <-signals:
break ProducerLoop
}
}

log.Printf("Enqueued: %d; errors: %d\n", enqueued, errors)
log.Printf("Enqueued: %d; producerError: %d\n", enqueued, producerError)
varun06 marked this conversation as resolved.
Show resolved Hide resolved
}

// This example shows how to use the producer with separate goroutines
Expand All @@ -1282,8 +1282,8 @@ func ExampleAsyncProducer_goroutines() {
signal.Notify(signals, os.Interrupt)

var (
wg sync.WaitGroup
enqueued, successes, errors int
wg sync.WaitGroup
enqueued, successes, producerError int
)

wg.Add(1)
Expand All @@ -1299,7 +1299,7 @@ func ExampleAsyncProducer_goroutines() {
defer wg.Done()
for err := range producer.Errors() {
log.Println(err)
errors++
producerError++
}
}()

Expand All @@ -1318,5 +1318,5 @@ ProducerLoop:

wg.Wait()

log.Printf("Successfully produced: %d; errors: %d\n", successes, errors)
log.Printf("Successfully produced: %d; producerError: %d\n", successes, producerError)
varun06 marked this conversation as resolved.
Show resolved Hide resolved
}
6 changes: 3 additions & 3 deletions balance_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,11 +690,11 @@ func sortPartitionsByPotentialConsumerAssignments(partition2AllPotentialConsumer
}

func deepCopyAssignment(assignment map[string][]topicPartitionAssignment) map[string][]topicPartitionAssignment {
copy := make(map[string][]topicPartitionAssignment, len(assignment))
m := make(map[string][]topicPartitionAssignment, len(assignment))
for memberID, subscriptions := range assignment {
copy[memberID] = append(subscriptions[:0:0], subscriptions...)
m[memberID] = append(subscriptions[:0:0], subscriptions...)
}
return copy
return m
}

func areSubscriptionsIdentical(partition2AllPotentialConsumers map[topicPartitionAssignment][]string, consumer2AllPotentialPartitions map[string][]topicPartitionAssignment) bool {
Expand Down
8 changes: 4 additions & 4 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,13 +422,13 @@ func (child *partitionConsumer) AsyncClose() {
func (child *partitionConsumer) Close() error {
child.AsyncClose()

var errors ConsumerErrors
var consumerErrors ConsumerErrors
varun06 marked this conversation as resolved.
Show resolved Hide resolved
for err := range child.errors {
errors = append(errors, err)
consumerErrors = append(consumerErrors, err)
}

if len(errors) > 0 {
return errors
if len(consumerErrors) > 0 {
return consumerErrors
}
return nil
}
Expand Down
14 changes: 7 additions & 7 deletions consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,36 +255,36 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler
}

// Sync consumer group
sync, err := c.syncGroupRequest(coordinator, plan, join.GenerationId)
groupRequest, err := c.syncGroupRequest(coordinator, plan, join.GenerationId)
if err != nil {
_ = coordinator.Close()
return nil, err
}
switch sync.Err {
switch groupRequest.Err {
case ErrNoError:
case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
c.memberID = ""
return c.newSession(ctx, topics, handler, retries)
case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refresh
if retries <= 0 {
return nil, sync.Err
return nil, groupRequest.Err
}

return c.retryNewSession(ctx, topics, handler, retries, true)
case ErrRebalanceInProgress: // retry after backoff
if retries <= 0 {
return nil, sync.Err
return nil, groupRequest.Err
}

return c.retryNewSession(ctx, topics, handler, retries, false)
default:
return nil, sync.Err
return nil, groupRequest.Err
}

// Retrieve and sort claims
var claims map[string][]int32
if len(sync.MemberAssignment) > 0 {
members, err := sync.GetMemberAssignment()
if len(groupRequest.MemberAssignment) > 0 {
members, err := groupRequest.GetMemberAssignment()
if err != nil {
return nil, err
}
Expand Down