Skip to content

Commit

Permalink
Merge pull request #1929 from celrenheit/fetch-offset-require-stable-…
Browse files Browse the repository at this point in the history
…offset

Handle isolation level in Offset(Request|Response) and require stable offset in FetchOffset(Request|Response)
  • Loading branch information
dnwe authored Apr 29, 2021
2 parents 9a7d94e + fce917a commit 16dacfe
Show file tree
Hide file tree
Showing 11 changed files with 422 additions and 44 deletions.
1 change: 1 addition & 0 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitRespon
//FetchOffset returns an offset fetch response or error
func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
response := new(OffsetFetchResponse)
response.Version = request.Version // needed to handle the two header versions

err := b.sendAndReceive(request, response)
if err != nil {
Expand Down
18 changes: 18 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,12 @@ const (
ErrPreferredLeaderNotAvailable KError = 80
ErrGroupMaxSizeReached KError = 81
ErrFencedInstancedId KError = 82
ErrEligibleLeadersNotAvailable KError = 83
ErrElectionNotNeeded KError = 84
ErrNoReassignmentInProgress KError = 85
ErrGroupSubscribedToTopic KError = 86
ErrInvalidRecord KError = 87
ErrUnstableOffsetCommit KError = 88
)

func (err KError) Error() string {
Expand Down Expand Up @@ -382,6 +388,18 @@ func (err KError) Error() string {
return "kafka server: Consumer group The consumer group has reached its max size. already has the configured maximum number of members."
case ErrFencedInstancedId:
return "kafka server: The broker rejected this static consumer since another consumer with the same group.instance.id has registered with a different member.id."
case ErrEligibleLeadersNotAvailable:
return "kafka server: Eligible topic partition leaders are not available."
case ErrElectionNotNeeded:
return "kafka server: Leader election not needed for topic partition."
case ErrNoReassignmentInProgress:
return "kafka server: No partition reassignment is in progress."
case ErrGroupSubscribedToTopic:
return "kafka server: Deleting offsets of a topic is forbidden while the consumer group is actively subscribed to it."
case ErrInvalidRecord:
return "kafka server: This record has failed the validation on broker and hence will be rejected."
case ErrUnstableOffsetCommit:
return "kafka server: There are unstable offsets that need to be cleared."
}

return fmt.Sprintf("Unknown error, how did this happen? Error code = %d", err)
Expand Down
137 changes: 120 additions & 17 deletions offset_fetch_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,60 +3,155 @@ package sarama
type OffsetFetchRequest struct {
Version int16
ConsumerGroup string
RequireStable bool // requires v7+
partitions map[string][]int32
}

func (r *OffsetFetchRequest) encode(pe packetEncoder) (err error) {
if r.Version < 0 || r.Version > 5 {
if r.Version < 0 || r.Version > 7 {
return PacketEncodingError{"invalid or unsupported OffsetFetchRequest version field"}
}

if err = pe.putString(r.ConsumerGroup); err != nil {
isFlexible := r.Version >= 6

if isFlexible {
err = pe.putCompactString(r.ConsumerGroup)
} else {
err = pe.putString(r.ConsumerGroup)
}
if err != nil {
return err
}

if r.Version >= 2 && r.partitions == nil {
pe.putInt32(-1)
} else {
if err = pe.putArrayLength(len(r.partitions)); err != nil {
return err
if isFlexible {
if r.partitions == nil {
pe.putUVarint(0)
} else {
pe.putCompactArrayLength(len(r.partitions))
}
for topic, partitions := range r.partitions {
if err = pe.putString(topic); err != nil {
return err
}
if err = pe.putInt32Array(partitions); err != nil {
} else {
if r.partitions == nil && r.Version >= 2 {
pe.putInt32(-1)
} else {
if err = pe.putArrayLength(len(r.partitions)); err != nil {
return err
}
}
}

for topic, partitions := range r.partitions {
if isFlexible {
err = pe.putCompactString(topic)
} else {
err = pe.putString(topic)
}
if err != nil {
return err
}

//

if isFlexible {
err = pe.putCompactInt32Array(partitions)
} else {
err = pe.putInt32Array(partitions)
}
if err != nil {
return err
}

if isFlexible {
pe.putEmptyTaggedFieldArray()
}
}

if r.RequireStable && r.Version < 7 {
return PacketEncodingError{"requireStable is not supported. use version 7 or later"}
}

if r.Version >= 7 {
pe.putBool(r.RequireStable)
}

if isFlexible {
pe.putEmptyTaggedFieldArray()
}

return nil
}

func (r *OffsetFetchRequest) decode(pd packetDecoder, version int16) (err error) {
r.Version = version
if r.ConsumerGroup, err = pd.getString(); err != nil {
isFlexible := r.Version >= 6
if isFlexible {
r.ConsumerGroup, err = pd.getCompactString()
} else {
r.ConsumerGroup, err = pd.getString()
}
if err != nil {
return err
}
partitionCount, err := pd.getArrayLength()

var partitionCount int

if isFlexible {
partitionCount, err = pd.getCompactArrayLength()
} else {
partitionCount, err = pd.getArrayLength()
}
if err != nil {
return err
}

if (partitionCount == 0 && version < 2) || partitionCount < 0 {
return nil
}
r.partitions = make(map[string][]int32)

r.partitions = make(map[string][]int32, partitionCount)
for i := 0; i < partitionCount; i++ {
topic, err := pd.getString()
var topic string
if isFlexible {
topic, err = pd.getCompactString()
} else {
topic, err = pd.getString()
}
if err != nil {
return err
}
partitions, err := pd.getInt32Array()

var partitions []int32
if isFlexible {
partitions, err = pd.getCompactInt32Array()
} else {
partitions, err = pd.getInt32Array()
}
if err != nil {
return err
}
if isFlexible {
_, err = pd.getEmptyTaggedFieldArray()
if err != nil {
return err
}
}

r.partitions[topic] = partitions
}

if r.Version >= 7 {
r.RequireStable, err = pd.getBool()
if err != nil {
return err
}
}

if isFlexible {
_, err = pd.getEmptyTaggedFieldArray()
if err != nil {
return err
}
}

return nil
}

Expand All @@ -69,6 +164,10 @@ func (r *OffsetFetchRequest) version() int16 {
}

func (r *OffsetFetchRequest) headerVersion() int16 {
if r.Version >= 6 {
return 2
}

return 1
}

Expand All @@ -84,6 +183,10 @@ func (r *OffsetFetchRequest) requiredVersion() KafkaVersion {
return V2_0_0_0
case 5:
return V2_1_0_0
case 6:
return V2_4_0_0
case 7:
return V2_5_0_0
default:
return MinVersion
}
Expand Down
60 changes: 60 additions & 0 deletions offset_fetch_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ var (
0x00, 0x00,
0x00, 0x00, 0x00, 0x00}

offsetFetchRequestNoPartitionsV6 = []byte{
0x05, 'b', 'l', 'a', 'h', 0x01, 0x00}

offsetFetchRequestNoPartitionsV7 = []byte{
0x05, 'b', 'l', 'a', 'h', 0x01, 0x01, 0x00}

offsetFetchRequestNoPartitions = []byte{
0x00, 0x04, 'b', 'l', 'a', 'h',
0x00, 0x00, 0x00, 0x00}
Expand All @@ -21,6 +27,20 @@ var (
0x00, 0x00, 0x00, 0x01,
0x4F, 0x4F, 0x4F, 0x4F}

offsetFetchRequestOnePartitionV6 = []byte{
0x05, 'b', 'l', 'a', 'h',
0x02, 0x0E, 't', 'o', 'p', 'i', 'c', 'T', 'h', 'e', 'F', 'i', 'r', 's', 't',
0x02,
0x4F, 0x4F, 0x4F, 0x4F,
0x00, 0x00}

offsetFetchRequestOnePartitionV7 = []byte{
0x05, 'b', 'l', 'a', 'h',
0x02, 0x0E, 't', 'o', 'p', 'i', 'c', 'T', 'h', 'e', 'F', 'i', 'r', 's', 't',
0x02,
0x4F, 0x4F, 0x4F, 0x4F,
0x00, 0x00, 0x00}

offsetFetchRequestAllPartitions = []byte{
0x00, 0x04, 'b', 'l', 'a', 'h',
0xff, 0xff, 0xff, 0xff}
Expand All @@ -36,7 +56,29 @@ func TestOffsetFetchRequestNoPartitions(t *testing.T) {
request.ConsumerGroup = "blah"
testRequest(t, fmt.Sprintf("no partitions %d", version), request, offsetFetchRequestNoPartitions)
}

{ // v6
version := 6
request := new(OffsetFetchRequest)
request.Version = int16(version)
request.ConsumerGroup = "blah"
request.ZeroPartitions()

testRequest(t, fmt.Sprintf("no partitions %d", version), request, offsetFetchRequestNoPartitionsV6)
}

{ // v7
version := 7
request := new(OffsetFetchRequest)
request.Version = int16(version)
request.ConsumerGroup = "blah"
request.RequireStable = true
request.ZeroPartitions()

testRequest(t, fmt.Sprintf("no partitions %d", version), request, offsetFetchRequestNoPartitionsV7)
}
}

func TestOffsetFetchRequest(t *testing.T) {
for version := 0; version <= 5; version++ {
request := new(OffsetFetchRequest)
Expand All @@ -45,6 +87,24 @@ func TestOffsetFetchRequest(t *testing.T) {
request.AddPartition("topicTheFirst", 0x4F4F4F4F)
testRequest(t, fmt.Sprintf("one partition %d", version), request, offsetFetchRequestOnePartition)
}

{ //v6
version := 6
request := new(OffsetFetchRequest)
request.Version = int16(version)
request.ConsumerGroup = "blah"
request.AddPartition("topicTheFirst", 0x4F4F4F4F)
testRequest(t, fmt.Sprintf("one partition %d", version), request, offsetFetchRequestOnePartitionV6)
}

{ //v7
version := 7
request := new(OffsetFetchRequest)
request.Version = int16(version)
request.ConsumerGroup = "blah"
request.AddPartition("topicTheFirst", 0x4F4F4F4F)
testRequest(t, fmt.Sprintf("one partition %d", version), request, offsetFetchRequestOnePartitionV7)
}
}

func TestOffsetFetchRequestAllPartitions(t *testing.T) {
Expand Down
Loading

0 comments on commit 16dacfe

Please sign in to comment.