Skip to content

Commit

Permalink
add helper for putEmptyTaggedFieldArray
Browse files Browse the repository at this point in the history
  • Loading branch information
d-rk authored and sladkoff committed Feb 20, 2020
1 parent 314db3f commit 1cf58af
Show file tree
Hide file tree
Showing 12 changed files with 116 additions and 141 deletions.
48 changes: 16 additions & 32 deletions alter_partition_reassignments_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,20 @@ type alterPartitionReassignmentsBlock struct {
replicas []int32
}

func (b *alterPartitionReassignmentsBlock) encode(pe packetEncoder, version int16) error {
pe.putCompactInt32Array(b.replicas)
// tagged field
pe.putInt8(0)
return nil
}
func (b *alterPartitionReassignmentsBlock) encode(pe packetEncoder) error {

func (b *alterPartitionReassignmentsBlock) decode(pd packetDecoder, version int16) (err error) {

replicaCount, err := pd.getCompactArrayLength()
if err != nil {
if err := pe.putCompactInt32Array(b.replicas); err != nil {
return err
}

b.replicas = make([]int32, replicaCount)
pe.putEmptyTaggedFieldArray()
return nil
}

for i := 0; i < replicaCount; i++ {
if replica, err := pd.getInt32(); err != nil {
return err
} else {
b.replicas[i] = replica
}
func (b *alterPartitionReassignmentsBlock) decode(pd packetDecoder) (err error) {

if b.replicas, err = pd.getCompactInt32Array(); err != nil {
return err
}
return nil
}
Expand All @@ -48,16 +40,14 @@ func (r *AlterPartitionReassignmentsRequest) encode(pe packetEncoder) error {
pe.putCompactArrayLength(len(partitions))
for partition, block := range partitions {
pe.putInt32(partition)
if err := block.encode(pe, r.Version); err != nil {
if err := block.encode(pe); err != nil {
return err
}
}
//another tagged field
pe.putInt8(0)
pe.putEmptyTaggedFieldArray()
}

//another tagged field
pe.putInt8(0)
pe.putEmptyTaggedFieldArray()

return nil
}
Expand Down Expand Up @@ -91,28 +81,22 @@ func (r *AlterPartitionReassignmentsRequest) decode(pd packetDecoder, version in
return err
}
block := &alterPartitionReassignmentsBlock{}
if err := block.decode(pd, r.Version); err != nil {
if err := block.decode(pd); err != nil {
return err
}
r.blocks[topic][partition] = block

// empty tagged fields array
_, err = pd.getUVarint()
if err != nil {
if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
return err
}
}
// empty tagged fields array
_, err = pd.getUVarint()
if err != nil {
if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
return err
}
}
}

// empty tagged fields array
_, err = pd.getUVarint()
if err != nil {
if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
return err
}

Expand Down
53 changes: 0 additions & 53 deletions alter_partition_reassignments_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,44 +20,6 @@ var (
0, 0, 3, 233, // replica 1001
0, 0, 0, // empty tagged fields
}

alterPartitionReassignmentsRequestTwoBlocks = []byte{
0, 0, 39, 16, // timout 10000
3, // 3-1=2 blocks
6, 116, 111, 112, 105, 99, // topic name "topic" as compact string
2, // 2-1=1 partitions
0, 0, 0, 0, // partitionId
3, // 3-1=2 replica array size
0, 0, 3, 232, // replica 1000
0, 0, 3, 233, // replica 1001
0, 0, // empty tagged fields
7, 116, 111, 112, 105, 99, 50, // topic name "topic2" as compact string
2, // 2-1=1 partitions
0, 0, 0, 1, // partitionId
3, // 3-1=2 replica array size
0, 0, 3, 233, // replica 1001
0, 0, 3, 234, // replica 1002
0, 0, // empty tagged fields
0, // empty tagged fields
}

alterPartitionReassignmentsRequestTwoPartitions = []byte{
0, 0, 39, 16, // timout 10000
2, // 2-1=1 block
6, 116, 111, 112, 105, 99, // topic name "topic" as compact string
3, // 3-1=2 partitions
0, 0, 0, 0, // partitionId
3, // 3-1=2 replica array size
0, 0, 3, 232, // replica 1000
0, 0, 3, 233, // replica 1001
0, // empty tagged fields
0, 0, 0, 1, // partitionId
3, // 3-1=2 replica array size
0, 0, 3, 233, // replica 1001
0, 0, 3, 234, // replica 1002
0, 0, // empty tagged fields
0, // empty tagged fields
}
)

func TestAlterPartitionReassignmentRequest(t *testing.T) {
Expand All @@ -73,19 +35,4 @@ func TestAlterPartitionReassignmentRequest(t *testing.T) {
request.AddBlock("topic", 0, []int32{1000, 1001})

testRequest(t, "one block", request, alterPartitionReassignmentsRequestOneBlock)

request.AddBlock("topic2", 1, []int32{1001, 1002})

testRequest(t, "two blocks", request, alterPartitionReassignmentsRequestTwoBlocks)

request = &AlterPartitionReassignmentsRequest{
TimeoutMs: int32(10000),
Version: int16(0),
}

request.AddBlock("topic", 0, []int32{1000, 1001})
request.AddBlock("topic", 1, []int32{1001, 1002})

testRequest(t, "two partitions", request, alterPartitionReassignmentsRequestTwoPartitions)

}
69 changes: 29 additions & 40 deletions alter_partition_reassignments_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,22 @@ func (b *alterPartitionReassignmentsErrorBlock) encode(pe packetEncoder) error {
if err := pe.putNullableCompactString(b.errorMessage); err != nil {
return err
}
// tagged field
pe.putUVarint(0)
pe.putEmptyTaggedFieldArray()

return nil
}

func (b *alterPartitionReassignmentsErrorBlock) decode(pd packetDecoder) (err error) {
errorCode, err := pd.getInt32()
errorCode, err := pd.getInt16()
if err != nil {
return err
}
b.errorCode = KError(errorCode)
b.errorMessage, err = pd.getCompactNullableString()

if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
return err
}
return err
}

Expand Down Expand Up @@ -67,13 +70,10 @@ func (r *AlterPartitionReassignmentsResponse) encode(pe packetEncoder) error {
return err
}
}
// tagged field
pe.putUVarint(0)
pe.putEmptyTaggedFieldArray()
}

// tagged field
pe.putUVarint(0)

pe.putEmptyTaggedFieldArray()
return nil
}

Expand All @@ -96,55 +96,44 @@ func (r *AlterPartitionReassignmentsResponse) decode(pd packetDecoder, version i
}

numTopics, err := pd.getCompactArrayLength()
if err != nil || numTopics == 0 {
if err != nil {
return err
}

r.Errors = make(map[string]map[int32]*alterPartitionReassignmentsErrorBlock, numTopics)
for i := 0; i < int(numTopics); i++ {
name, err := pd.getCompactString()
if err != nil {
return err
}

ongoingPartitionReassignments, err := pd.getCompactArrayLength()
if err != nil {
return err
}

r.Errors[name] = make(map[int32]*alterPartitionReassignmentsErrorBlock, ongoingPartitionReassignments)

for j := 0; j < ongoingPartitionReassignments; j++ {
partition, err := pd.getInt32()
if err != nil {
return err
}
errorCode, err := pd.getInt16()
if numTopics > 0 {
r.Errors = make(map[string]map[int32]*alterPartitionReassignmentsErrorBlock, numTopics)
for i := 0; i < numTopics; i++ {
topic, err := pd.getCompactString()
if err != nil {
return err
}
errorMessage, err := pd.getCompactNullableString()

ongoingPartitionReassignments, err := pd.getCompactArrayLength()
if err != nil {
return err
}

if errorCode != 0 {
if errorMessage == nil {
errorMessage = new(string)
r.Errors[topic] = make(map[int32]*alterPartitionReassignmentsErrorBlock, ongoingPartitionReassignments)

for j := 0; j < ongoingPartitionReassignments; j++ {
partition, err := pd.getInt32()
if err != nil {
return err
}
block := &alterPartitionReassignmentsErrorBlock{}
if err := block.decode(pd); err != nil {
return err
}

r.AddError(name, partition, KError(errorCode), errorMessage)
r.Errors[topic][partition] = block
}

if _, err = pd.getEmptyTaggedFields(); err != nil {
if _, err = pd.getEmptyTaggedFieldArray(); err != nil {
return err
}
}
if _, err = pd.getEmptyTaggedFields(); err != nil {
return err
}
}
if _, err = pd.getEmptyTaggedFields(); err != nil {

if _, err = pd.getEmptyTaggedFieldArray(); err != nil {
return err
}

Expand Down
13 changes: 13 additions & 0 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,19 @@ func (b *Broker) AlterPartitionReassignments(request *AlterPartitionReassignment
return response, nil
}

//ListPartitionReassignments sends a list partition reassignments request and
//returns list partition reassignments response
func (b *Broker) ListPartitionReassignments(request *ListPartitionReassignmentsRequest) (*ListPartitionReassignmentsResponse, error) {
response := new(ListPartitionReassignmentsResponse)

err := b.sendAndReceive(request, response)
if err != nil {
return nil, err
}

return response, nil
}

//DeleteRecords send a request to delete records and return delete record
//response or error
func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error) {
Expand Down
14 changes: 4 additions & 10 deletions list_partition_reassignments_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,10 @@ func (r *ListPartitionReassignmentsRequest) encode(pe packetEncoder) error {
return err
}

//another tagged field
pe.putInt8(0)
pe.putEmptyTaggedFieldArray()
}

//another tagged field
pe.putInt8(0)
pe.putEmptyTaggedFieldArray()

return nil
}
Expand Down Expand Up @@ -60,17 +58,13 @@ func (r *ListPartitionReassignmentsRequest) decode(pd packetDecoder, version int
}
r.blocks[topic][j] = partition
}
// empty tagged fields array
_, err = pd.getUVarint()
if err != nil {
if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
return err
}
}
}

// empty tagged fields array
_, err = pd.getUVarint()
if err != nil {
if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
return err
}

Expand Down
3 changes: 2 additions & 1 deletion packet_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type packetDecoder interface {
getArrayLength() (int, error)
getCompactArrayLength() (int, error)
getBool() (bool, error)
getEmptyTaggedFields() (int, error)
getEmptyTaggedFieldArray() (int, error)

// Collections
getBytes() ([]byte, error)
Expand All @@ -24,6 +24,7 @@ type packetDecoder interface {
getNullableString() (*string, error)
getCompactString() (string, error)
getCompactNullableString() (*string, error)
getCompactInt32Array() ([]int32, error)
getInt32Array() ([]int32, error)
getInt64Array() ([]int64, error)
getStringArray() ([]string, error)
Expand Down
1 change: 1 addition & 0 deletions packet_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type packetEncoder interface {
putCompactInt32Array(in []int32) error
putInt32Array(in []int32) error
putInt64Array(in []int64) error
putEmptyTaggedFieldArray()

// Provide the current offset to record the batch size metric
offset() int
Expand Down
4 changes: 4 additions & 0 deletions prep_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ func (pe *prepEncoder) putInt64Array(in []int64) error {
return nil
}

func (pe *prepEncoder) putEmptyTaggedFieldArray() {
pe.putUVarint(0)
}

func (pe *prepEncoder) offset() int {
return pe.length
}
Expand Down
Loading

0 comments on commit 1cf58af

Please sign in to comment.