Skip to content

Commit

Permalink
This commit enables isolation level handling when fetching offsets.
Browse files Browse the repository at this point in the history
It enables list available offset (OffsetRequest and OffsetResponse)
to require read committed isolation level.
It adds RequireStable field to FetchOffsetRequest.
  • Loading branch information
celrenheit committed Apr 29, 2021
1 parent 02d5c83 commit c5e3fdd
Show file tree
Hide file tree
Showing 11 changed files with 422 additions and 44 deletions.
1 change: 1 addition & 0 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ func (b *Broker) CommitOffset(request *OffsetCommitRequest) (*OffsetCommitRespon
//FetchOffset returns an offset fetch response or error
func (b *Broker) FetchOffset(request *OffsetFetchRequest) (*OffsetFetchResponse, error) {
response := new(OffsetFetchResponse)
response.Version = request.Version // needed to handle the two header versions

err := b.sendAndReceive(request, response)
if err != nil {
Expand Down
18 changes: 18 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,12 @@ const (
ErrPreferredLeaderNotAvailable KError = 80
ErrGroupMaxSizeReached KError = 81
ErrFencedInstancedId KError = 82
ErrEligibleLeadersNotAvailable KError = 83
ErrElectionNotNeeded KError = 84
ErrNoReassignmentInProgress KError = 85
ErrGroupSubscribedToTopic KError = 86
ErrInvalidRecord KError = 87
ErrUnstableOffsetCommit KError = 88
)

func (err KError) Error() string {
Expand Down Expand Up @@ -382,6 +388,18 @@ func (err KError) Error() string {
return "kafka server: Consumer group The consumer group has reached its max size. already has the configured maximum number of members."
case ErrFencedInstancedId:
return "kafka server: The broker rejected this static consumer since another consumer with the same group.instance.id has registered with a different member.id."
case ErrEligibleLeadersNotAvailable:
return "kafka server: Eligible topic partition leaders are not available."
case ErrElectionNotNeeded:
return "kafka server: Leader election not needed for topic partition."
case ErrNoReassignmentInProgress:
return "kafka server: No partition reassignment is in progress."
case ErrGroupSubscribedToTopic:
return "kafka server: Deleting offsets of a topic is forbidden while the consumer group is actively subscribed to it."
case ErrInvalidRecord:
return "kafka server: This record has failed the validation on broker and hence will be rejected."
case ErrUnstableOffsetCommit:
return "kafka server: There are unstable offsets that need to be cleared."
}

return fmt.Sprintf("Unknown error, how did this happen? Error code = %d", err)
Expand Down
137 changes: 120 additions & 17 deletions offset_fetch_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,60 +3,155 @@ package sarama
type OffsetFetchRequest struct {
Version int16
ConsumerGroup string
RequireStable bool // requires v7+
partitions map[string][]int32
}

func (r *OffsetFetchRequest) encode(pe packetEncoder) (err error) {
if r.Version < 0 || r.Version > 5 {
if r.Version < 0 || r.Version > 7 {
return PacketEncodingError{"invalid or unsupported OffsetFetchRequest version field"}
}

if err = pe.putString(r.ConsumerGroup); err != nil {
isFlexible := r.Version >= 6

if isFlexible {
err = pe.putCompactString(r.ConsumerGroup)
} else {
err = pe.putString(r.ConsumerGroup)
}
if err != nil {
return err
}

if r.Version >= 2 && r.partitions == nil {
pe.putInt32(-1)
} else {
if err = pe.putArrayLength(len(r.partitions)); err != nil {
return err
if isFlexible {
if r.partitions == nil {
pe.putUVarint(0)
} else {
pe.putCompactArrayLength(len(r.partitions))
}
for topic, partitions := range r.partitions {
if err = pe.putString(topic); err != nil {
return err
}
if err = pe.putInt32Array(partitions); err != nil {
} else {
if r.partitions == nil && r.Version >= 2 {
pe.putInt32(-1)
} else {
if err = pe.putArrayLength(len(r.partitions)); err != nil {
return err
}
}
}

for topic, partitions := range r.partitions {
if isFlexible {
err = pe.putCompactString(topic)
} else {
err = pe.putString(topic)
}
if err != nil {
return err
}

//

if isFlexible {
err = pe.putCompactInt32Array(partitions)
} else {
err = pe.putInt32Array(partitions)
}
if err != nil {
return err
}

if isFlexible {
pe.putEmptyTaggedFieldArray()
}
}

if r.RequireStable && r.Version < 7 {
return PacketEncodingError{"requireStable is not supported. use version 7 or later"}
}

if r.Version >= 7 {
pe.putBool(r.RequireStable)
}

if isFlexible {
pe.putEmptyTaggedFieldArray()
}

return nil
}

func (r *OffsetFetchRequest) decode(pd packetDecoder, version int16) (err error) {
r.Version = version
if r.ConsumerGroup, err = pd.getString(); err != nil {
isFlexible := r.Version >= 6
if isFlexible {
r.ConsumerGroup, err = pd.getCompactString()
} else {
r.ConsumerGroup, err = pd.getString()
}
if err != nil {
return err
}
partitionCount, err := pd.getArrayLength()

var partitionCount int

if isFlexible {
partitionCount, err = pd.getCompactArrayLength()
} else {
partitionCount, err = pd.getArrayLength()
}
if err != nil {
return err
}

if (partitionCount == 0 && version < 2) || partitionCount < 0 {
return nil
}
r.partitions = make(map[string][]int32)

r.partitions = make(map[string][]int32, partitionCount)
for i := 0; i < partitionCount; i++ {
topic, err := pd.getString()
var topic string
if isFlexible {
topic, err = pd.getCompactString()
} else {
topic, err = pd.getString()
}
if err != nil {
return err
}
partitions, err := pd.getInt32Array()

var partitions []int32
if isFlexible {
partitions, err = pd.getCompactInt32Array()
} else {
partitions, err = pd.getInt32Array()
}
if err != nil {
return err
}
if isFlexible {
_, err = pd.getEmptyTaggedFieldArray()
if err != nil {
return err
}
}

r.partitions[topic] = partitions
}

if r.Version >= 7 {
r.RequireStable, err = pd.getBool()
if err != nil {
return err
}
}

if isFlexible {
_, err = pd.getEmptyTaggedFieldArray()
if err != nil {
return err
}
}

return nil
}

Expand All @@ -69,6 +164,10 @@ func (r *OffsetFetchRequest) version() int16 {
}

func (r *OffsetFetchRequest) headerVersion() int16 {
if r.Version >= 6 {
return 2
}

return 1
}

Expand All @@ -84,6 +183,10 @@ func (r *OffsetFetchRequest) requiredVersion() KafkaVersion {
return V2_0_0_0
case 5:
return V2_1_0_0
case 6:
return V2_4_0_0
case 7:
return V2_5_0_0
default:
return MinVersion
}
Expand Down
60 changes: 60 additions & 0 deletions offset_fetch_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ var (
0x00, 0x00,
0x00, 0x00, 0x00, 0x00}

offsetFetchRequestNoPartitionsV6 = []byte{
0x05, 'b', 'l', 'a', 'h', 0x01, 0x00}

offsetFetchRequestNoPartitionsV7 = []byte{
0x05, 'b', 'l', 'a', 'h', 0x01, 0x01, 0x00}

offsetFetchRequestNoPartitions = []byte{
0x00, 0x04, 'b', 'l', 'a', 'h',
0x00, 0x00, 0x00, 0x00}
Expand All @@ -21,6 +27,20 @@ var (
0x00, 0x00, 0x00, 0x01,
0x4F, 0x4F, 0x4F, 0x4F}

offsetFetchRequestOnePartitionV6 = []byte{
0x05, 'b', 'l', 'a', 'h',
0x02, 0x0E, 't', 'o', 'p', 'i', 'c', 'T', 'h', 'e', 'F', 'i', 'r', 's', 't',
0x02,
0x4F, 0x4F, 0x4F, 0x4F,
0x00, 0x00}

offsetFetchRequestOnePartitionV7 = []byte{
0x05, 'b', 'l', 'a', 'h',
0x02, 0x0E, 't', 'o', 'p', 'i', 'c', 'T', 'h', 'e', 'F', 'i', 'r', 's', 't',
0x02,
0x4F, 0x4F, 0x4F, 0x4F,
0x00, 0x00, 0x00}

offsetFetchRequestAllPartitions = []byte{
0x00, 0x04, 'b', 'l', 'a', 'h',
0xff, 0xff, 0xff, 0xff}
Expand All @@ -36,7 +56,29 @@ func TestOffsetFetchRequestNoPartitions(t *testing.T) {
request.ConsumerGroup = "blah"
testRequest(t, fmt.Sprintf("no partitions %d", version), request, offsetFetchRequestNoPartitions)
}

{ // v6
version := 6
request := new(OffsetFetchRequest)
request.Version = int16(version)
request.ConsumerGroup = "blah"
request.ZeroPartitions()

testRequest(t, fmt.Sprintf("no partitions %d", version), request, offsetFetchRequestNoPartitionsV6)
}

{ // v7
version := 7
request := new(OffsetFetchRequest)
request.Version = int16(version)
request.ConsumerGroup = "blah"
request.RequireStable = true
request.ZeroPartitions()

testRequest(t, fmt.Sprintf("no partitions %d", version), request, offsetFetchRequestNoPartitionsV7)
}
}

func TestOffsetFetchRequest(t *testing.T) {
for version := 0; version <= 5; version++ {
request := new(OffsetFetchRequest)
Expand All @@ -45,6 +87,24 @@ func TestOffsetFetchRequest(t *testing.T) {
request.AddPartition("topicTheFirst", 0x4F4F4F4F)
testRequest(t, fmt.Sprintf("one partition %d", version), request, offsetFetchRequestOnePartition)
}

{ //v6
version := 6
request := new(OffsetFetchRequest)
request.Version = int16(version)
request.ConsumerGroup = "blah"
request.AddPartition("topicTheFirst", 0x4F4F4F4F)
testRequest(t, fmt.Sprintf("one partition %d", version), request, offsetFetchRequestOnePartitionV6)
}

{ //v7
version := 7
request := new(OffsetFetchRequest)
request.Version = int16(version)
request.ConsumerGroup = "blah"
request.AddPartition("topicTheFirst", 0x4F4F4F4F)
testRequest(t, fmt.Sprintf("one partition %d", version), request, offsetFetchRequestOnePartitionV7)
}
}

func TestOffsetFetchRequestAllPartitions(t *testing.T) {
Expand Down
Loading

0 comments on commit c5e3fdd

Please sign in to comment.