Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: document Fetch protocol fields #2289

Merged
merged 1 commit into from
Jul 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 43 additions & 14 deletions fetch_request.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
package sarama

type fetchRequestBlock struct {
Version int16
Version int16
// currentLeaderEpoch contains the current leader epoch of the partition.
currentLeaderEpoch int32
fetchOffset int64
logStartOffset int64
maxBytes int32
// fetchOffset contains the message offset.
fetchOffset int64
// logStartOffset contains the earliest available offset of the follower
// replica. The field is only used when the request is sent by the
// follower.
logStartOffset int64
// maxBytes contains the maximum bytes to fetch from this partition. See
// KIP-74 for cases where this limit may not be honored.
maxBytes int32
}

func (b *fetchRequestBlock) encode(pe packetEncoder, version int16) error {
Expand Down Expand Up @@ -46,16 +53,38 @@ func (b *fetchRequestBlock) decode(pd packetDecoder, version int16) (err error)
// https://issues.apache.org/jira/browse/KAFKA-2063 for a discussion of the issues leading up to that. The KIP is at
// https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes
type FetchRequest struct {
MaxWaitTime int32
MinBytes int32
MaxBytes int32
Version int16
Isolation IsolationLevel
SessionID int32
// Version defines the protocol version to use for encode and decode
Version int16
// ReplicaID contains the broker ID of the follower, of -1 if this request
// is from a consumer.
// ReplicaID int32
// MaxWaitTime contains the maximum time in milliseconds to wait for the response.
MaxWaitTime int32
// MinBytes contains the minimum bytes to accumulate in the response.
MinBytes int32
// MaxBytes contains the maximum bytes to fetch. See KIP-74 for cases
// where this limit may not be honored.
MaxBytes int32
// Isolation contains a This setting controls the visibility of
// transactional records. Using READ_UNCOMMITTED (isolation_level = 0)
// makes all records visible. With READ_COMMITTED (isolation_level = 1),
// non-transactional and COMMITTED transactional records are visible. To be
// more concrete, READ_COMMITTED returns all data from offsets smaller than
// the current LSO (last stable offset), and enables the inclusion of the
// list of aborted transactions in the result, which allows consumers to
// discard ABORTED transactional records
Isolation IsolationLevel
// SessionID contains the fetch session ID.
SessionID int32
// SessionEpoch contains the epoch of the partition leader as known to the
// follower replica or a consumer.
SessionEpoch int32
blocks map[string]map[int32]*fetchRequestBlock
forgotten map[string][]int32
RackID string
// blocks contains the topics to fetch.
blocks map[string]map[int32]*fetchRequestBlock
// forgotten contains in an incremental fetch request, the partitions to remove.
forgotten map[string][]int32
// RackID contains a Rack ID of the consumer making this request
RackID string
}

type IsolationLevel int8
Expand All @@ -66,7 +95,7 @@ const (
)

func (r *FetchRequest) encode(pe packetEncoder) (err error) {
pe.putInt32(-1) // replica ID is always -1 for clients
pe.putInt32(-1) // ReplicaID is always -1 for clients
pe.putInt32(r.MaxWaitTime)
pe.putInt32(r.MinBytes)
if r.Version >= 3 {
Expand Down
50 changes: 36 additions & 14 deletions fetch_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import (
const invalidPreferredReplicaID = -1

type AbortedTransaction struct {
ProducerID int64
// ProducerID contains the producer id associated with the aborted transaction.
ProducerID int64
// FirstOffset contains the first offset in the aborted transaction.
FirstOffset int64
}

Expand All @@ -33,16 +35,28 @@ func (t *AbortedTransaction) encode(pe packetEncoder) (err error) {
}

type FetchResponseBlock struct {
Err KError
HighWaterMarkOffset int64
// Err contains the error code, or 0 if there was no fetch error.
Err KError
// HighWatermarkOffset contains the current high water mark.
HighWaterMarkOffset int64
// LastStableOffset contains the last stable offset (or LSO) of the
// partition. This is the last offset such that the state of all
// transactional records prior to this offset have been decided (ABORTED or
// COMMITTED)
LastStableOffset int64
LastRecordsBatchOffset *int64
LogStartOffset int64
AbortedTransactions []*AbortedTransaction
PreferredReadReplica int32
Records *Records // deprecated: use FetchResponseBlock.RecordsSet
RecordsSet []*Records
Partial bool
// LogStartOffset contains the current log start offset.
LogStartOffset int64
// AbortedTransactions contains the aborted transactions.
AbortedTransactions []*AbortedTransaction
// PreferredReadReplica contains the preferred read replica for the
// consumer to use on its next fetch request
PreferredReadReplica int32
// RecordsSet contains the record data.
RecordsSet []*Records

Partial bool
Records *Records // deprecated: use FetchResponseBlock.RecordsSet
}

func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error) {
Expand Down Expand Up @@ -233,11 +247,19 @@ func (b *FetchResponseBlock) getAbortedTransactions() []*AbortedTransaction {
}

type FetchResponse struct {
Blocks map[string]map[int32]*FetchResponseBlock
ThrottleTime time.Duration
ErrorCode int16
SessionID int32
Version int16
// Version defines the protocol version to use for encode and decode
Version int16
// ThrottleTime contains the duration in milliseconds for which the request
// was throttled due to a quota violation, or zero if the request did not
// violate any quota.
ThrottleTime time.Duration
// ErrorCode contains the top level response error code.
ErrorCode int16
// SessionID contains the fetch session ID, or 0 if this is not part of a fetch session.
SessionID int32
// Blocks contains the response topics.
Blocks map[string]map[int32]*FetchResponseBlock

LogAppendTime bool
Timestamp time.Time
}
Expand Down