Skip to content

Commit

Permalink
fix merge build fail & ut
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaofan committed May 16, 2022
1 parent c7f1512 commit 1c3c916
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 29 deletions.
9 changes: 5 additions & 4 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,8 @@ func TestValidGroupInstanceId(t *testing.T) {
if err == nil {
t.Errorf("Expected validGroupInstanceId %s to be error, got nil", testcase.grouptInstanceId)
}
if _, ok := err.(ConfigurationError); !ok {
var target ConfigurationError
if !errors.As(err, &target) {
t.Errorf("Excepted err to be ConfigurationError, got %v", err)
}
}
Expand All @@ -536,12 +537,12 @@ func TestValidGroupInstanceId(t *testing.T) {
func TestGroupInstanceIdAndVersionValidation(t *testing.T) {
config := NewTestConfig()
config.Consumer.Group.InstanceId = "groupInstanceId1"
if err := config.Validate(); string(err.(ConfigurationError)) != "Consumer.Group.InstanceId need Version >= 2.3" {
t.Error("Expected invalid zstd/kafka version error, got ", err)
if err := config.Validate(); !strings.Contains(err.Error(), "Consumer.Group.InstanceId need Version >= 2.3") {
t.Error("Expected invalid group instance error, got ", err)
}
config.Version = V2_3_0_0
if err := config.Validate(); err != nil {
t.Error("Expected zstd to work, got ", err)
t.Error("Expected group instance to work, got ", err)
}
}

Expand Down
5 changes: 1 addition & 4 deletions consumer_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,11 +308,8 @@ func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler
case ErrMemberIdRequired:
// from JoinGroupRequest v4, if client start with empty member id,
// it need to get member id from response and send another join request to join group
if retries <= 0 {
return nil, join.Err
}
c.memberID = join.MemberId
return c.retryNewSession(ctx, topics, handler, retries, false)
return c.retryNewSession(ctx, topics, handler, retries+1 /*keep retry time*/, false)
case ErrFencedInstancedId:
if c.groupInstanceId != nil {
Logger.Printf("JoinGroup failed: group instance id %s has been fenced\n", *c.groupInstanceId)
Expand Down
22 changes: 12 additions & 10 deletions functional_consumer_staticmembership_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
//+build functional
//go:build functional
// +build functional

package sarama

import (
"encoding/json"
"errors"
"math"
"reflect"
"sync/atomic"
Expand All @@ -23,15 +25,15 @@ func TestFuncConsumerGroupStaticMembership_Basic(t *testing.T) {
config1.Version = V2_3_0_0
config1.Consumer.Offsets.Initial = OffsetNewest
config1.Consumer.Group.InstanceId = "Instance1"
m1 := runTestFuncConsumerGroupMemberWithConfig(t, groupID, 100, config1, nil, "test.4")
m1 := runTestFuncConsumerGroupMemberWithConfig(t, config1, groupID, 100, nil, "test.4")
defer m1.Close()

config2 := NewTestConfig()
config2.ClientID = "M2"
config2.Version = V2_3_0_0
config2.Consumer.Offsets.Initial = OffsetNewest
config2.Consumer.Group.InstanceId = "Instance2"
m2 := runTestFuncConsumerGroupMemberWithConfig(t, groupID, 100, config2, nil, "test.4")
m2 := runTestFuncConsumerGroupMemberWithConfig(t, config2, groupID, 100, nil, "test.4")
defer m2.Close()

m1.WaitForState(2)
Expand Down Expand Up @@ -77,15 +79,15 @@ func TestFuncConsumerGroupStaticMembership_RejoinAndLeave(t *testing.T) {
config1.Version = V2_3_0_0
config1.Consumer.Offsets.Initial = OffsetNewest
config1.Consumer.Group.InstanceId = "Instance1"
m1 := runTestFuncConsumerGroupMemberWithConfig(t, groupID, math.MaxInt32, config1, nil, "test.4")
m1 := runTestFuncConsumerGroupMemberWithConfig(t, config1, groupID, math.MaxInt32, nil, "test.4")
defer m1.Close()

config2 := NewTestConfig()
config2.ClientID = "M2"
config2.Version = V2_3_0_0
config2.Consumer.Offsets.Initial = OffsetNewest
config2.Consumer.Group.InstanceId = "Instance2"
m2 := runTestFuncConsumerGroupMemberWithConfig(t, groupID, math.MaxInt32, config2, nil, "test.4")
m2 := runTestFuncConsumerGroupMemberWithConfig(t, config2, groupID, math.MaxInt32, nil, "test.4")
defer m2.Close()

m1.WaitForState(2)
Expand Down Expand Up @@ -128,7 +130,7 @@ func TestFuncConsumerGroupStaticMembership_RejoinAndLeave(t *testing.T) {
}

// m2 rejoin, should generate a new memberId, no re-balance happens
m2 = runTestFuncConsumerGroupMemberWithConfig(t, groupID, math.MaxInt32, config2, nil, "test.4")
m2 = runTestFuncConsumerGroupMemberWithConfig(t, config2, groupID, math.MaxInt32, nil, "test.4")
m2.WaitForState(2)
m1.WaitForState(2)

Expand Down Expand Up @@ -180,15 +182,15 @@ func TestFuncConsumerGroupStaticMembership_Fenced(t *testing.T) {
config1.Version = V2_3_0_0
config1.Consumer.Offsets.Initial = OffsetNewest
config1.Consumer.Group.InstanceId = "Instance1"
m1 := runTestFuncConsumerGroupMemberWithConfig(t, groupID, math.MaxInt32, config1, nil, "test.4")
m1 := runTestFuncConsumerGroupMemberWithConfig(t, config1, groupID, math.MaxInt32, nil, "test.4")
defer m1.Close()

config2 := NewTestConfig()
config2.ClientID = "M2"
config2.Version = V2_3_0_0
config2.Consumer.Offsets.Initial = OffsetNewest
config2.Consumer.Group.InstanceId = "Instance2"
m2 := runTestFuncConsumerGroupMemberWithConfig(t, groupID, math.MaxInt32, config2, nil, "test.4")
m2 := runTestFuncConsumerGroupMemberWithConfig(t, config2, groupID, math.MaxInt32, nil, "test.4")
defer m2.Close()

m1.WaitForState(2)
Expand All @@ -200,7 +202,7 @@ func TestFuncConsumerGroupStaticMembership_Fenced(t *testing.T) {
config3.Consumer.Offsets.Initial = OffsetNewest
config3.Consumer.Group.InstanceId = "Instance2" // same instance id as config2

m3 := runTestFuncConsumerGroupMemberWithConfig(t, groupID, math.MaxInt32, config3, nil, "test.4")
m3 := runTestFuncConsumerGroupMemberWithConfig(t, config3, groupID, math.MaxInt32, nil, "test.4")
defer m3.Close()

m3.WaitForState(2)
Expand All @@ -209,7 +211,7 @@ func TestFuncConsumerGroupStaticMembership_Fenced(t *testing.T) {
if len(m2.errs) < 1 {
t.Errorf("expect m2 to be fenced by group instanced id, but got no err")
}
if m2.errs[0] != ErrFencedInstancedId {
if errors.Is(m2.errs[0], ErrFencedInstancedId) {
t.Errorf("expect m2 to be fenced by group instanced id, but got wrong err %v", m2.errs[0])
}

Expand Down
8 changes: 5 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,18 @@ require (
github.com/fortytw2/leaktest v1.3.0
github.com/frankban/quicktest v1.14.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/jcmturner/gofork v1.0.0
github.com/jcmturner/gokrb5/v8 v8.4.2
github.com/klauspost/compress v1.15.0
github.com/klauspost/compress v1.15.4
github.com/pierrec/lz4 v2.6.1+incompatible
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475
github.com/stretchr/testify v1.7.0
github.com/xdg-go/scram v1.1.1
golang.org/x/crypto v0.0.0-20220214200702-86341886e292 // indirect
golang.org/x/net v0.0.0-20220225172249-27dd8689420f
golang.org/x/crypto v0.0.0-20220513210258-46612604a0f9 // indirect
golang.org/x/net v0.0.0-20220516155154-20f960328961
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
Expand Down
18 changes: 10 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4=
github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/hashicorp/go-uuid v1.0.2 h1:cfejS+Tpcp13yd5nYHWDI6qVCny6wyX2Mt5SGur2IGE=
github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8=
github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8=
github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs=
github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo=
Expand All @@ -41,8 +43,8 @@ github.com/jcmturner/gokrb5/v8 v8.4.2 h1:6ZIM6b/JJN0X8UM43ZOM6Z4SJzla+a/u7scXFJz
github.com/jcmturner/gokrb5/v8 v8.4.2/go.mod h1:sb+Xq/fTY5yktf/VxLsE3wlfPqQjp0aWNYyvBVK62bc=
github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY=
github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
github.com/klauspost/compress v1.15.0 h1:xqfchp4whNFxn5A4XFyyYtitiWI8Hy5EW59jEwcyL6U=
github.com/klauspost/compress v1.15.0/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.15.4 h1:1kn4/7MepF/CHmYub99/nNX8az0IJjfSOU/jbnTVfqQ=
github.com/klauspost/compress v1.15.4/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
Expand Down Expand Up @@ -77,13 +79,13 @@ github.com/xdg-go/stringprep v1.0.3 h1:kdwGpVNwPFtjs98xCGkHjQtGKh86rDcRZN17QEMCO
github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20220214200702-86341886e292 h1:f+lwQ+GtmgoY+A2YaQxlSOnDjXcQ7ZRLWOHbC6HtRqE=
golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220513210258-46612604a0f9 h1:NUzdAbFtCJSXU20AOXgeqaUwg8Ypg4MPYmL+d+rsB5c=
golang.org/x/crypto v0.0.0-20220513210258-46612604a0f9/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220225172249-27dd8689420f h1:oA4XRj0qtSt8Yo1Zms0CUlsT3KG69V2UGQWPBxujDmc=
golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220516155154-20f960328961 h1:+W/iTMPG0EL7aW+/atntZwZrvSRIj3m3yX414dSULUU=
golang.org/x/net v0.0.0-20220516155154-20f960328961/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
1 change: 1 addition & 0 deletions heartbeat_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ var (
0, 0, 0, 100,
0, byte(ErrFencedInstancedId),
}
)

func TestHeartbeatResponse(t *testing.T) {
tests := []struct {
Expand Down

0 comments on commit 1c3c916

Please sign in to comment.