Skip to content

Commit

Permalink
make sure partition reassignment can be aborted
Browse files Browse the repository at this point in the history
  • Loading branch information
d-rk authored and sladkoff committed Feb 20, 2020
1 parent 023606a commit aaf9f16
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 1 deletion.
2 changes: 1 addition & 1 deletion alter_partition_reassignments_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ type alterPartitionReassignmentsBlock struct {

func (b *alterPartitionReassignmentsBlock) encode(pe packetEncoder) error {

if err := pe.putCompactInt32Array(b.replicas); err != nil {
if err := pe.putNullableCompactInt32Array(b.replicas); err != nil {
return err
}

Expand Down
18 changes: 18 additions & 0 deletions alter_partition_reassignments_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,16 @@ var (
0, 0, 3, 233, // replica 1001
0, 0, 0, // empty tagged fields
}

alterPartitionReassignmentsAbortRequest = []byte{
0, 0, 39, 16, // timout 10000
2, // 2-1=1 block
6, 116, 111, 112, 105, 99, // topic name "topic" as compact string
2, // 2-1=1 partitions
0, 0, 0, 0, // partitionId
0, // replica array is null (indicates that a pending reassignment should be aborted)
0, 0, 0, // empty tagged fields
}
)

func TestAlterPartitionReassignmentRequest(t *testing.T) {
Expand All @@ -35,4 +45,12 @@ func TestAlterPartitionReassignmentRequest(t *testing.T) {
request.AddBlock("topic", 0, []int32{1000, 1001})

testRequest(t, "one block", request, alterPartitionReassignmentsRequestOneBlock)

request = &AlterPartitionReassignmentsRequest{
TimeoutMs: int32(10000),
Version: int16(0),
}
request.AddBlock("topic", 0, nil)

testRequest(t, "abort assignment", request, alterPartitionReassignmentsAbortRequest)
}
1 change: 1 addition & 0 deletions packet_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type packetEncoder interface {
putNullableString(in *string) error
putStringArray(in []string) error
putCompactInt32Array(in []int32) error
putNullableCompactInt32Array(in []int32) error
putInt32Array(in []int32) error
putInt64Array(in []int64) error
putEmptyTaggedFieldArray()
Expand Down
18 changes: 18 additions & 0 deletions prep_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sarama

import (
"encoding/binary"
"errors"
"fmt"
"math"

Expand Down Expand Up @@ -131,6 +132,23 @@ func (pe *prepEncoder) putStringArray(in []string) error {
}

func (pe *prepEncoder) putCompactInt32Array(in []int32) error {

if in == nil {
return errors.New("expected int32 array to be non null")
}

pe.putUVarint(uint64(len(in)) + 1)
pe.length += 4 * len(in)
return nil
}

func (pe *prepEncoder) putNullableCompactInt32Array(in []int32) error {

if in == nil {
pe.putUVarint(0)
return nil
}

pe.putUVarint(uint64(len(in)) + 1)
pe.length += 4 * len(in)
return nil
Expand Down
17 changes: 17 additions & 0 deletions real_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sarama

import (
"encoding/binary"
"errors"

"github.com/rcrowley/go-metrics"
)
Expand Down Expand Up @@ -131,6 +132,22 @@ func (re *realEncoder) putStringArray(in []string) error {
}

func (re *realEncoder) putCompactInt32Array(in []int32) error {
if in == nil {
return errors.New("expected int32 array to be non null")
}
// 0 represents a null array, so +1 has to be added
re.putUVarint(uint64(len(in)) + 1)
for _, val := range in {
re.putInt32(val)
}
return nil
}

func (re *realEncoder) putNullableCompactInt32Array(in []int32) error {
if in == nil {
re.putUVarint(0)
return nil
}
// 0 represents a null array, so +1 has to be added
re.putUVarint(uint64(len(in)) + 1)
for _, val := range in {
Expand Down

0 comments on commit aaf9f16

Please sign in to comment.