Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle isolation level in Offset(Request|Response) and require stable offset in FetchOffset(Request|Response) #1929

Merged
merged 1 commit into from
Apr 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitRespon
//FetchOffset returns an offset fetch response or error
func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
response := new(OffsetFetchResponse)
response.Version = request.Version // needed to handle the two header versions
dnwe marked this conversation as resolved.
Show resolved Hide resolved

err := b.sendAndReceive(request, response)
if err != nil {
Expand Down
18 changes: 18 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,12 @@ const (
ErrPreferredLeaderNotAvailable KError = 80
ErrGroupMaxSizeReached KError = 81
ErrFencedInstancedId KError = 82
ErrEligibleLeadersNotAvailable KError = 83
ErrElectionNotNeeded KError = 84
ErrNoReassignmentInProgress KError = 85
ErrGroupSubscribedToTopic KError = 86
ErrInvalidRecord KError = 87
ErrUnstableOffsetCommit KError = 88
)

func (err KError) Error() string {
Expand Down Expand Up @@ -382,6 +388,18 @@ func (err KError) Error() string {
return "kafka server: Consumer group The consumer group has reached its max size. already has the configured maximum number of members."
case ErrFencedInstancedId:
return "kafka server: The broker rejected this static consumer since another consumer with the same group.instance.id has registered with a different member.id."
case ErrEligibleLeadersNotAvailable:
return "kafka server: Eligible topic partition leaders are not available."
case ErrElectionNotNeeded:
return "kafka server: Leader election not needed for topic partition."
case ErrNoReassignmentInProgress:
return "kafka server: No partition reassignment is in progress."
case ErrGroupSubscribedToTopic:
return "kafka server: Deleting offsets of a topic is forbidden while the consumer group is actively subscribed to it."
case ErrInvalidRecord:
return "kafka server: This record has failed the validation on broker and hence will be rejected."
case ErrUnstableOffsetCommit:
return "kafka server: There are unstable offsets that need to be cleared."
}

return fmt.Sprintf("Unknown error, how did this happen? Error code = %d", err)
Expand Down
137 changes: 120 additions & 17 deletions offset_fetch_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,60 +3,155 @@ package sarama
type OffsetFetchRequest struct {
Version int16
ConsumerGroup string
RequireStable bool // requires v7+
partitions map[string][]int32
}

func (r *OffsetFetchRequest) encode(pe packetEncoder) (err error) {
if r.Version < 0 || r.Version > 5 {
if r.Version < 0 || r.Version > 7 {
return PacketEncodingError{"invalid or unsupported OffsetFetchRequest version field"}
}

if err = pe.putString(r.ConsumerGroup); err != nil {
isFlexible := r.Version >= 6

if isFlexible {
err = pe.putCompactString(r.ConsumerGroup)
} else {
err = pe.putString(r.ConsumerGroup)
}
if err != nil {
return err
}

if r.Version >= 2 && r.partitions == nil {
pe.putInt32(-1)
} else {
if err = pe.putArrayLength(len(r.partitions)); err != nil {
return err
if isFlexible {
if r.partitions == nil {
pe.putUVarint(0)
} else {
pe.putCompactArrayLength(len(r.partitions))
}
for topic, partitions := range r.partitions {
if err = pe.putString(topic); err != nil {
return err
}
if err = pe.putInt32Array(partitions); err != nil {
} else {
if r.partitions == nil && r.Version >= 2 {
pe.putInt32(-1)
} else {
if err = pe.putArrayLength(len(r.partitions)); err != nil {
return err
}
}
}

for topic, partitions := range r.partitions {
if isFlexible {
err = pe.putCompactString(topic)
} else {
err = pe.putString(topic)
}
if err != nil {
return err
}

//

if isFlexible {
err = pe.putCompactInt32Array(partitions)
} else {
err = pe.putInt32Array(partitions)
}
if err != nil {
return err
}

if isFlexible {
pe.putEmptyTaggedFieldArray()
}
}

if r.RequireStable && r.Version < 7 {
return PacketEncodingError{"requireStable is not supported. use version 7 or later"}
}

if r.Version >= 7 {
pe.putBool(r.RequireStable)
}

if isFlexible {
pe.putEmptyTaggedFieldArray()
}

return nil
}

func (r *OffsetFetchRequest) decode(pd packetDecoder, version int16) (err error) {
r.Version = version
if r.ConsumerGroup, err = pd.getString(); err != nil {
isFlexible := r.Version >= 6
if isFlexible {
r.ConsumerGroup, err = pd.getCompactString()
} else {
r.ConsumerGroup, err = pd.getString()
}
if err != nil {
return err
}
partitionCount, err := pd.getArrayLength()

var partitionCount int

if isFlexible {
partitionCount, err = pd.getCompactArrayLength()
} else {
partitionCount, err = pd.getArrayLength()
}
if err != nil {
return err
}

if (partitionCount == 0 && version < 2) || partitionCount < 0 {
return nil
}
r.partitions = make(map[string][]int32)

r.partitions = make(map[string][]int32, partitionCount)
for i := 0; i < partitionCount; i++ {
topic, err := pd.getString()
var topic string
if isFlexible {
topic, err = pd.getCompactString()
} else {
topic, err = pd.getString()
}
if err != nil {
return err
}
partitions, err := pd.getInt32Array()

var partitions []int32
if isFlexible {
partitions, err = pd.getCompactInt32Array()
} else {
partitions, err = pd.getInt32Array()
}
if err != nil {
return err
}
if isFlexible {
_, err = pd.getEmptyTaggedFieldArray()
if err != nil {
return err
}
}

r.partitions[topic] = partitions
}

if r.Version >= 7 {
r.RequireStable, err = pd.getBool()
if err != nil {
return err
}
}

if isFlexible {
_, err = pd.getEmptyTaggedFieldArray()
if err != nil {
return err
}
}

return nil
}

Expand All @@ -69,6 +164,10 @@ func (r *OffsetFetchRequest) version() int16 {
}

func (r *OffsetFetchRequest) headerVersion() int16 {
if r.Version >= 6 {
return 2
}

return 1
}

Expand All @@ -84,6 +183,10 @@ func (r *OffsetFetchRequest) requiredVersion() KafkaVersion {
return V2_0_0_0
case 5:
return V2_1_0_0
case 6:
return V2_4_0_0
case 7:
return V2_5_0_0
default:
return MinVersion
}
Expand Down
60 changes: 60 additions & 0 deletions offset_fetch_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ var (
0x00, 0x00,
0x00, 0x00, 0x00, 0x00}

offsetFetchRequestNoPartitionsV6 = []byte{
0x05, 'b', 'l', 'a', 'h', 0x01, 0x00}

offsetFetchRequestNoPartitionsV7 = []byte{
0x05, 'b', 'l', 'a', 'h', 0x01, 0x01, 0x00}

offsetFetchRequestNoPartitions = []byte{
0x00, 0x04, 'b', 'l', 'a', 'h',
0x00, 0x00, 0x00, 0x00}
Expand All @@ -21,6 +27,20 @@ var (
0x00, 0x00, 0x00, 0x01,
0x4F, 0x4F, 0x4F, 0x4F}

offsetFetchRequestOnePartitionV6 = []byte{
0x05, 'b', 'l', 'a', 'h',
0x02, 0x0E, 't', 'o', 'p', 'i', 'c', 'T', 'h', 'e', 'F', 'i', 'r', 's', 't',
0x02,
0x4F, 0x4F, 0x4F, 0x4F,
0x00, 0x00}

offsetFetchRequestOnePartitionV7 = []byte{
0x05, 'b', 'l', 'a', 'h',
0x02, 0x0E, 't', 'o', 'p', 'i', 'c', 'T', 'h', 'e', 'F', 'i', 'r', 's', 't',
0x02,
0x4F, 0x4F, 0x4F, 0x4F,
0x00, 0x00, 0x00}

offsetFetchRequestAllPartitions = []byte{
0x00, 0x04, 'b', 'l', 'a', 'h',
0xff, 0xff, 0xff, 0xff}
Expand All @@ -36,7 +56,29 @@ func TestOffsetFetchRequestNoPartitions(t *testing.T) {
request.ConsumerGroup = "blah"
testRequest(t, fmt.Sprintf("no partitions %d", version), request, offsetFetchRequestNoPartitions)
}

{ // v6
version := 6
request := new(OffsetFetchRequest)
request.Version = int16(version)
request.ConsumerGroup = "blah"
request.ZeroPartitions()

testRequest(t, fmt.Sprintf("no partitions %d", version), request, offsetFetchRequestNoPartitionsV6)
}

{ // v7
version := 7
request := new(OffsetFetchRequest)
request.Version = int16(version)
request.ConsumerGroup = "blah"
request.RequireStable = true
request.ZeroPartitions()

testRequest(t, fmt.Sprintf("no partitions %d", version), request, offsetFetchRequestNoPartitionsV7)
}
}

func TestOffsetFetchRequest(t *testing.T) {
for version := 0; version <= 5; version++ {
request := new(OffsetFetchRequest)
Expand All @@ -45,6 +87,24 @@ func TestOffsetFetchRequest(t *testing.T) {
request.AddPartition("topicTheFirst", 0x4F4F4F4F)
testRequest(t, fmt.Sprintf("one partition %d", version), request, offsetFetchRequestOnePartition)
}

{ //v6
version := 6
request := new(OffsetFetchRequest)
request.Version = int16(version)
request.ConsumerGroup = "blah"
request.AddPartition("topicTheFirst", 0x4F4F4F4F)
testRequest(t, fmt.Sprintf("one partition %d", version), request, offsetFetchRequestOnePartitionV6)
}

{ //v7
version := 7
request := new(OffsetFetchRequest)
request.Version = int16(version)
request.ConsumerGroup = "blah"
request.AddPartition("topicTheFirst", 0x4F4F4F4F)
testRequest(t, fmt.Sprintf("one partition %d", version), request, offsetFetchRequestOnePartitionV7)
}
}

func TestOffsetFetchRequestAllPartitions(t *testing.T) {
Expand Down
Loading