forked from IBM/sarama
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdelete_offsets_request.go
92 lines (76 loc) · 1.69 KB
/
delete_offsets_request.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package sarama
type DeleteOffsetsRequest struct {
Group string
partitions map[string][]int32
}
func (r *DeleteOffsetsRequest) encode(pe packetEncoder) (err error) {
err = pe.putString(r.Group)
if err != nil {
return err
}
if r.partitions == nil {
pe.putInt32(0)
} else {
if err = pe.putArrayLength(len(r.partitions)); err != nil {
return err
}
}
for topic, partitions := range r.partitions {
err = pe.putString(topic)
if err != nil {
return err
}
err = pe.putInt32Array(partitions)
if err != nil {
return err
}
}
return
}
func (r *DeleteOffsetsRequest) decode(pd packetDecoder, version int16) (err error) {
r.Group, err = pd.getString()
if err != nil {
return err
}
var partitionCount int
partitionCount, err = pd.getArrayLength()
if err != nil {
return err
}
if (partitionCount == 0 && version < 2) || partitionCount < 0 {
return nil
}
r.partitions = make(map[string][]int32, partitionCount)
for i := 0; i < partitionCount; i++ {
var topic string
topic, err = pd.getString()
if err != nil {
return err
}
var partitions []int32
partitions, err = pd.getInt32Array()
if err != nil {
return err
}
r.partitions[topic] = partitions
}
return nil
}
func (r *DeleteOffsetsRequest) key() int16 {
return 47
}
func (r *DeleteOffsetsRequest) version() int16 {
return 0
}
func (r *DeleteOffsetsRequest) headerVersion() int16 {
return 1
}
func (r *DeleteOffsetsRequest) requiredVersion() KafkaVersion {
return V2_4_0_0
}
func (r *DeleteOffsetsRequest) AddPartition(topic string, partitionID int32) {
if r.partitions == nil {
r.partitions = make(map[string][]int32)
}
r.partitions[topic] = append(r.partitions[topic], partitionID)
}