Skip to content

Commit

Permalink
Merge branch 'master' into cd/1577/consumer-group-mocks
Browse files Browse the repository at this point in the history
  • Loading branch information
Kranti Deep committed Aug 12, 2020
2 parents 94dce3e + c1c2a08 commit 6dbd2ce
Show file tree
Hide file tree
Showing 26 changed files with 496 additions and 61 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:
fail-fast: false
matrix:
go-version: [1.14.x]
kafka-version: [2.4.1, 2.5.0]
kafka-version: [2.4.1, 2.6.0]
platform: [ubuntu-latest]

env:
Expand Down Expand Up @@ -41,7 +41,7 @@ jobs:

- name: Install dependencies
run: |
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.27.0
curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.30.0
export REPOSITORY_ROOT=${GITHUB_WORKSPACE}
- name: Run test suite
Expand Down
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,3 @@ coverage.txt
profile.out

simplest-uncommitted-msg-0.1-jar-with-dependencies.jar

4 changes: 3 additions & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,12 @@ linters:
- varcheck
- whitespace
# - goconst
# - gochecknoinits
- gochecknoinits

issues:
exclude:
- consider giving a name to these results
- include an explanation for nolint directive
- Potential Integer overflow made by strconv.Atoi result conversion to int16/32
- Use of weak random number generator
- TLS MinVersion too low
67 changes: 67 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,73 @@

#### Unreleased

#### Version 1.27.0 (2020-08-11)

# Improvements

#1466 - @rubenvp8510 - Expose kerberos fast negotiation configuration
#1695 - @KJTsanaktsidis - Use docker-compose to run the functional tests
#1699 - @wclaeys - Consumer group support for manually comitting offsets
#1714 - @bai - Bump Go to version 1.14.3, golangci-lint to 1.27.0
#1726 - @d1egoaz - Include zstd on the functional tests
#1730 - @d1egoaz - KIP-42 Add producer and consumer interceptors
#1738 - @varun06 - fixed variable names that are named same as some std lib package names
#1741 - @varun06 - updated zstd dependency to latest v1.10.10
#1743 - @varun06 - Fixed declaration dependencies and other lint issues in code base
#1763 - @alrs - remove deprecated tls options from test
#1769 - @bai - Add support for Kafka 2.6.0

# Fixes

#1697 - @kvch - Use gofork for encoding/asn1 to fix ASN errors during Kerberos authentication
#1744 - @alrs - Fix isBalanced Function Signature

#### Version 1.26.4 (2020-05-19)

# Fixes

- #1701 - @d1egoaz - Set server name only for the current broker
- #1694 - @dnwe - testfix: set KAFKA_HEAP_OPTS for zk and kafka

#### Version 1.26.3 (2020-05-07)

# Fixes

- #1692 - @d1egoaz - Set tls ServerName to fix issue: either ServerName or InsecureSkipVerify must be specified in the tls.Config

#### Version 1.26.2 (2020-05-06)

# ⚠️ Known Issues

This release has been marked as not ready for production and may be unstable, please use v1.26.4.

# Improvements

- #1560 - @iyacontrol - add sync pool for gzip 1-9
- #1605 - @dnwe - feat: protocol support for V11 fetch w/ rackID
- #1617 - @sladkoff / @dwi-di / @random-dwi - Add support for alter/list partition reassignements APIs
- #1632 - @bai - Add support for Go 1.14
- #1640 - @random-dwi - Feature/fix list partition reassignments
- #1646 - @mimaison - Add DescribeLogDirs to admin client
- #1667 - @bai - Add support for kafka 2.5.0

# Fixes

- #1594 - @sladkoff - Sets ConfigEntry.Default flag in addition to the ConfigEntry.Source for Kafka versions > V1_1_0_0
- #1601 - @alrs - fix: remove use of testing.T.FailNow() inside goroutine
- #1602 - @d1egoaz - adds a note about consumer groups Consume method
- #1607 - @darklore - Fix memory leak when Broker.Open and Broker.Close called repeatedly
- #1613 - @wblakecaldwell - Updated "retrying" log message when BackoffFunc implemented
- #1614 - @alrs - produce_response.go: Remove Unused Functions
- #1619 - @alrs - tools/kafka-producer-performance: prune unused flag variables
- #1639 - @agriffaut - Handle errors with no message but error code
- #1643 - @kzinglzy - fix `config.net.keepalive`
- #1644 - @KJTsanaktsidis - Fix brokers continually allocating new Session IDs
- #1645 - @Stephan14 - Remove broker(s) which no longer exist in metadata
- #1650 - @lavoiesl - Return the response error in heartbeatLoop
- #1661 - @KJTsanaktsidis - Fix "broker received out of order sequence" when brokers die
- #1666 - @KevinJCross - Bugfix: Allow TLS connections to work over socks proxy.

#### Version 1.26.1 (2020-02-04)

Improvements:
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ You might also want to look at the [Frequently Asked Questions](https://github.c
Sarama provides a "2 releases + 2 months" compatibility guarantee: we support
the two latest stable releases of Kafka and Go, and we provide a two month
grace period for older releases. This means we currently officially support
Go 1.12 through 1.14, and Kafka 2.3 through 2.5, although older releases are
Go 1.13 through 1.14, and Kafka 2.4 through 2.6, although older releases are
still likely to work.

Sarama follows semantic versioning and provides API stability via the gopkg.in service.
Expand Down
16 changes: 8 additions & 8 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -802,7 +802,7 @@ func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err e
// Query brokers in parallel, since we have to query *all* brokers
brokers := ca.client.Brokers()
groupMaps := make(chan map[string]string, len(brokers))
errors := make(chan error, len(brokers))
errChan := make(chan error, len(brokers))
wg := sync.WaitGroup{}

for _, b := range brokers {
Expand All @@ -813,7 +813,7 @@ func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err e

response, err := b.ListGroups(&ListGroupsRequest{})
if err != nil {
errors <- err
errChan <- err
return
}

Expand All @@ -828,7 +828,7 @@ func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err e

wg.Wait()
close(groupMaps)
close(errors)
close(errChan)

for groupMap := range groupMaps {
for group, protocolType := range groupMap {
Expand All @@ -837,7 +837,7 @@ func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err e
}

// Intentionally return only the first error for simplicity
err = <-errors
err = <-errChan
return
}

Expand Down Expand Up @@ -893,7 +893,7 @@ func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32

// Query brokers in parallel, since we may have to query multiple brokers
logDirsMaps := make(chan map[int32][]DescribeLogDirsResponseDirMetadata, len(brokerIds))
errors := make(chan error, len(brokerIds))
errChan := make(chan error, len(brokerIds))
wg := sync.WaitGroup{}

for _, b := range brokerIds {
Expand All @@ -909,7 +909,7 @@ func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32

response, err := b.DescribeLogDirs(&DescribeLogDirsRequest{})
if err != nil {
errors <- err
errChan <- err
return
}
logDirs := make(map[int32][]DescribeLogDirsResponseDirMetadata)
Expand All @@ -920,7 +920,7 @@ func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32

wg.Wait()
close(logDirsMaps)
close(errors)
close(errChan)

for logDirsMap := range logDirsMaps {
for id, logDirs := range logDirsMap {
Expand All @@ -929,6 +929,6 @@ func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32
}

// Intentionally return only the first error for simplicity
err = <-errors
err = <-errChan
return
}
2 changes: 1 addition & 1 deletion api_versions_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package sarama
import "testing"

var (
apiVersionRequest = []byte{}
apiVersionRequest []byte
)

func TestApiVersionsRequest(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,10 @@ func (p *asyncProducer) dispatcher() {
p.inFlight.Add(1)
}

for _, interceptor := range p.conf.Producer.Interceptors {
msg.safelyApplyInterceptor(interceptor)
}

version := 1
if p.conf.Version.IsAtLeast(V0_11_0_0) {
version = 2
Expand Down
Loading

0 comments on commit 6dbd2ce

Please sign in to comment.