Skip to content

Commit

Permalink
Merge pull request #2289 from Shopify/dnwe/doc
Browse files Browse the repository at this point in the history
chore: document Fetch protocol fields
  • Loading branch information
dnwe authored Jul 22, 2022
2 parents 739ace7 + 4a375ec commit 845935f
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 28 deletions.
57 changes: 43 additions & 14 deletions fetch_request.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
package sarama

type fetchRequestBlock struct {
Version int16
Version int16
// currentLeaderEpoch contains the current leader epoch of the partition.
currentLeaderEpoch int32
fetchOffset int64
logStartOffset int64
maxBytes int32
// fetchOffset contains the message offset.
fetchOffset int64
// logStartOffset contains the earliest available offset of the follower
// replica. The field is only used when the request is sent by the
// follower.
logStartOffset int64
// maxBytes contains the maximum bytes to fetch from this partition. See
// KIP-74 for cases where this limit may not be honored.
maxBytes int32
}

func (b *fetchRequestBlock) encode(pe packetEncoder, version int16) error {
Expand Down Expand Up @@ -46,16 +53,38 @@ func (b *fetchRequestBlock) decode(pd packetDecoder, version int16) (err error)
// https://issues.apache.org/jira/browse/KAFKA-2063 for a discussion of the issues leading up to that. The KIP is at
// https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes
type FetchRequest struct {
MaxWaitTime int32
MinBytes int32
MaxBytes int32
Version int16
Isolation IsolationLevel
SessionID int32
// Version defines the protocol version to use for encode and decode
Version int16
// ReplicaID contains the broker ID of the follower, of -1 if this request
// is from a consumer.
// ReplicaID int32
// MaxWaitTime contains the maximum time in milliseconds to wait for the response.
MaxWaitTime int32
// MinBytes contains the minimum bytes to accumulate in the response.
MinBytes int32
// MaxBytes contains the maximum bytes to fetch. See KIP-74 for cases
// where this limit may not be honored.
MaxBytes int32
// Isolation contains a This setting controls the visibility of
// transactional records. Using READ_UNCOMMITTED (isolation_level = 0)
// makes all records visible. With READ_COMMITTED (isolation_level = 1),
// non-transactional and COMMITTED transactional records are visible. To be
// more concrete, READ_COMMITTED returns all data from offsets smaller than
// the current LSO (last stable offset), and enables the inclusion of the
// list of aborted transactions in the result, which allows consumers to
// discard ABORTED transactional records
Isolation IsolationLevel
// SessionID contains the fetch session ID.
SessionID int32
// SessionEpoch contains the epoch of the partition leader as known to the
// follower replica or a consumer.
SessionEpoch int32
blocks map[string]map[int32]*fetchRequestBlock
forgotten map[string][]int32
RackID string
// blocks contains the topics to fetch.
blocks map[string]map[int32]*fetchRequestBlock
// forgotten contains in an incremental fetch request, the partitions to remove.
forgotten map[string][]int32
// RackID contains a Rack ID of the consumer making this request
RackID string
}

type IsolationLevel int8
Expand All @@ -66,7 +95,7 @@ const (
)

func (r *FetchRequest) encode(pe packetEncoder) (err error) {
pe.putInt32(-1) // replica ID is always -1 for clients
pe.putInt32(-1) // ReplicaID is always -1 for clients
pe.putInt32(r.MaxWaitTime)
pe.putInt32(r.MinBytes)
if r.Version >= 3 {
Expand Down
50 changes: 36 additions & 14 deletions fetch_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
const invalidPreferredReplicaID = -1

type AbortedTransaction struct {
ProducerID int64
// ProducerID contains the producer id associated with the aborted transaction.
ProducerID int64
// FirstOffset contains the first offset in the aborted transaction.
FirstOffset int64
}

Expand All @@ -33,16 +35,28 @@ func (t *AbortedTransaction) encode(pe packetEncoder) (err error) {
}

type FetchResponseBlock struct {
Err KError
HighWaterMarkOffset int64
// Err contains the error code, or 0 if there was no fetch error.
Err KError
// HighWatermarkOffset contains the current high water mark.
HighWaterMarkOffset int64
// LastStableOffset contains the last stable offset (or LSO) of the
// partition. This is the last offset such that the state of all
// transactional records prior to this offset have been decided (ABORTED or
// COMMITTED)
LastStableOffset int64
LastRecordsBatchOffset *int64
LogStartOffset int64
AbortedTransactions []*AbortedTransaction
PreferredReadReplica int32
Records *Records // deprecated: use FetchResponseBlock.RecordsSet
RecordsSet []*Records
Partial bool
// LogStartOffset contains the current log start offset.
LogStartOffset int64
// AbortedTransactions contains the aborted transactions.
AbortedTransactions []*AbortedTransaction
// PreferredReadReplica contains the preferred read replica for the
// consumer to use on its next fetch request
PreferredReadReplica int32
// RecordsSet contains the record data.
RecordsSet []*Records

Partial bool
Records *Records // deprecated: use FetchResponseBlock.RecordsSet
}

func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error) {
Expand Down Expand Up @@ -233,11 +247,19 @@ func (b *FetchResponseBlock) getAbortedTransactions() []*AbortedTransaction {
}

type FetchResponse struct {
Blocks map[string]map[int32]*FetchResponseBlock
ThrottleTime time.Duration
ErrorCode int16
SessionID int32
Version int16
// Version defines the protocol version to use for encode and decode
Version int16
// ThrottleTime contains the duration in milliseconds for which the request
// was throttled due to a quota violation, or zero if the request did not
// violate any quota.
ThrottleTime time.Duration
// ErrorCode contains the top level response error code.
ErrorCode int16
// SessionID contains the fetch session ID, or 0 if this is not part of a fetch session.
SessionID int32
// Blocks contains the response topics.
Blocks map[string]map[int32]*FetchResponseBlock

LogAppendTime bool
Timestamp time.Time
}
Expand Down

0 comments on commit 845935f

Please sign in to comment.