diff --git a/alter_partition_reassignments_request.go b/alter_partition_reassignments_request.go index 8b537d58e..776436e1b 100644 --- a/alter_partition_reassignments_request.go +++ b/alter_partition_reassignments_request.go @@ -6,7 +6,7 @@ type alterPartitionReassignmentsBlock struct { func (b *alterPartitionReassignmentsBlock) encode(pe packetEncoder) error { - if err := pe.putCompactInt32Array(b.replicas); err != nil { + if err := pe.putNullableCompactInt32Array(b.replicas); err != nil { return err } diff --git a/alter_partition_reassignments_request_test.go b/alter_partition_reassignments_request_test.go index 04192bd98..8d282729d 100644 --- a/alter_partition_reassignments_request_test.go +++ b/alter_partition_reassignments_request_test.go @@ -20,6 +20,16 @@ var ( 0, 0, 3, 233, // replica 1001 0, 0, 0, // empty tagged fields } + + alterPartitionReassignmentsAbortRequest = []byte{ + 0, 0, 39, 16, // timout 10000 + 2, // 2-1=1 block + 6, 116, 111, 112, 105, 99, // topic name "topic" as compact string + 2, // 2-1=1 partitions + 0, 0, 0, 0, // partitionId + 0, // replica array is null (indicates that a pending reassignment should be aborted) + 0, 0, 0, // empty tagged fields + } ) func TestAlterPartitionReassignmentRequest(t *testing.T) { @@ -35,4 +45,12 @@ func TestAlterPartitionReassignmentRequest(t *testing.T) { request.AddBlock("topic", 0, []int32{1000, 1001}) testRequest(t, "one block", request, alterPartitionReassignmentsRequestOneBlock) + + request = &AlterPartitionReassignmentsRequest{ + TimeoutMs: int32(10000), + Version: int16(0), + } + request.AddBlock("topic", 0, nil) + + testRequest(t, "abort assignment", request, alterPartitionReassignmentsAbortRequest) } diff --git a/packet_encoder.go b/packet_encoder.go index 6ae9d9766..50c735c04 100644 --- a/packet_encoder.go +++ b/packet_encoder.go @@ -27,6 +27,7 @@ type packetEncoder interface { putNullableString(in *string) error putStringArray(in []string) error putCompactInt32Array(in []int32) error + putNullableCompactInt32Array(in []int32) error putInt32Array(in []int32) error putInt64Array(in []int64) error putEmptyTaggedFieldArray() diff --git a/prep_encoder.go b/prep_encoder.go index af87a233b..5448c814f 100644 --- a/prep_encoder.go +++ b/prep_encoder.go @@ -2,6 +2,7 @@ package sarama import ( "encoding/binary" + "errors" "fmt" "math" @@ -131,6 +132,23 @@ func (pe *prepEncoder) putStringArray(in []string) error { } func (pe *prepEncoder) putCompactInt32Array(in []int32) error { + + if in == nil { + return errors.New("expected int32 array to be non null") + } + + pe.putUVarint(uint64(len(in)) + 1) + pe.length += 4 * len(in) + return nil +} + +func (pe *prepEncoder) putNullableCompactInt32Array(in []int32) error { + + if in == nil { + pe.putUVarint(0) + return nil + } + pe.putUVarint(uint64(len(in)) + 1) pe.length += 4 * len(in) return nil diff --git a/real_encoder.go b/real_encoder.go index 603d8418d..ba073f7d3 100644 --- a/real_encoder.go +++ b/real_encoder.go @@ -2,6 +2,7 @@ package sarama import ( "encoding/binary" + "errors" "github.com/rcrowley/go-metrics" ) @@ -131,6 +132,22 @@ func (re *realEncoder) putStringArray(in []string) error { } func (re *realEncoder) putCompactInt32Array(in []int32) error { + if in == nil { + return errors.New("expected int32 array to be non null") + } + // 0 represents a null array, so +1 has to be added + re.putUVarint(uint64(len(in)) + 1) + for _, val := range in { + re.putInt32(val) + } + return nil +} + +func (re *realEncoder) putNullableCompactInt32Array(in []int32) error { + if in == nil { + re.putUVarint(0) + return nil + } // 0 represents a null array, so +1 has to be added re.putUVarint(uint64(len(in)) + 1) for _, val := range in {