diff --git a/fetch_request.go b/fetch_request.go index f893aeff7..4da5a1d2d 100644 --- a/fetch_request.go +++ b/fetch_request.go @@ -1,11 +1,18 @@ package sarama type fetchRequestBlock struct { - Version int16 + Version int16 + // currentLeaderEpoch contains the current leader epoch of the partition. currentLeaderEpoch int32 - fetchOffset int64 - logStartOffset int64 - maxBytes int32 + // fetchOffset contains the message offset. + fetchOffset int64 + // logStartOffset contains the earliest available offset of the follower + // replica. The field is only used when the request is sent by the + // follower. + logStartOffset int64 + // maxBytes contains the maximum bytes to fetch from this partition. See + // KIP-74 for cases where this limit may not be honored. + maxBytes int32 } func (b *fetchRequestBlock) encode(pe packetEncoder, version int16) error { @@ -46,16 +53,38 @@ func (b *fetchRequestBlock) decode(pd packetDecoder, version int16) (err error) // https://issues.apache.org/jira/browse/KAFKA-2063 for a discussion of the issues leading up to that. The KIP is at // https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes type FetchRequest struct { - MaxWaitTime int32 - MinBytes int32 - MaxBytes int32 - Version int16 - Isolation IsolationLevel - SessionID int32 + // Version defines the protocol version to use for encode and decode + Version int16 + // ReplicaID contains the broker ID of the follower, of -1 if this request + // is from a consumer. + // ReplicaID int32 + // MaxWaitTime contains the maximum time in milliseconds to wait for the response. + MaxWaitTime int32 + // MinBytes contains the minimum bytes to accumulate in the response. + MinBytes int32 + // MaxBytes contains the maximum bytes to fetch. See KIP-74 for cases + // where this limit may not be honored. + MaxBytes int32 + // Isolation contains a This setting controls the visibility of + // transactional records. Using READ_UNCOMMITTED (isolation_level = 0) + // makes all records visible. With READ_COMMITTED (isolation_level = 1), + // non-transactional and COMMITTED transactional records are visible. To be + // more concrete, READ_COMMITTED returns all data from offsets smaller than + // the current LSO (last stable offset), and enables the inclusion of the + // list of aborted transactions in the result, which allows consumers to + // discard ABORTED transactional records + Isolation IsolationLevel + // SessionID contains the fetch session ID. + SessionID int32 + // SessionEpoch contains the epoch of the partition leader as known to the + // follower replica or a consumer. SessionEpoch int32 - blocks map[string]map[int32]*fetchRequestBlock - forgotten map[string][]int32 - RackID string + // blocks contains the topics to fetch. + blocks map[string]map[int32]*fetchRequestBlock + // forgotten contains in an incremental fetch request, the partitions to remove. + forgotten map[string][]int32 + // RackID contains a Rack ID of the consumer making this request + RackID string } type IsolationLevel int8 @@ -66,7 +95,7 @@ const ( ) func (r *FetchRequest) encode(pe packetEncoder) (err error) { - pe.putInt32(-1) // replica ID is always -1 for clients + pe.putInt32(-1) // ReplicaID is always -1 for clients pe.putInt32(r.MaxWaitTime) pe.putInt32(r.MinBytes) if r.Version >= 3 { diff --git a/fetch_response.go b/fetch_response.go index 5a392ec3a..fc2c1c714 100644 --- a/fetch_response.go +++ b/fetch_response.go @@ -9,7 +9,9 @@ import ( const invalidPreferredReplicaID = -1 type AbortedTransaction struct { - ProducerID int64 + // ProducerID contains the producer id associated with the aborted transaction. + ProducerID int64 + // FirstOffset contains the first offset in the aborted transaction. FirstOffset int64 } @@ -33,16 +35,28 @@ func (t *AbortedTransaction) encode(pe packetEncoder) (err error) { } type FetchResponseBlock struct { - Err KError - HighWaterMarkOffset int64 + // Err contains the error code, or 0 if there was no fetch error. + Err KError + // HighWatermarkOffset contains the current high water mark. + HighWaterMarkOffset int64 + // LastStableOffset contains the last stable offset (or LSO) of the + // partition. This is the last offset such that the state of all + // transactional records prior to this offset have been decided (ABORTED or + // COMMITTED) LastStableOffset int64 LastRecordsBatchOffset *int64 - LogStartOffset int64 - AbortedTransactions []*AbortedTransaction - PreferredReadReplica int32 - Records *Records // deprecated: use FetchResponseBlock.RecordsSet - RecordsSet []*Records - Partial bool + // LogStartOffset contains the current log start offset. + LogStartOffset int64 + // AbortedTransactions contains the aborted transactions. + AbortedTransactions []*AbortedTransaction + // PreferredReadReplica contains the preferred read replica for the + // consumer to use on its next fetch request + PreferredReadReplica int32 + // RecordsSet contains the record data. + RecordsSet []*Records + + Partial bool + Records *Records // deprecated: use FetchResponseBlock.RecordsSet } func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error) { @@ -233,11 +247,19 @@ func (b *FetchResponseBlock) getAbortedTransactions() []*AbortedTransaction { } type FetchResponse struct { - Blocks map[string]map[int32]*FetchResponseBlock - ThrottleTime time.Duration - ErrorCode int16 - SessionID int32 - Version int16 + // Version defines the protocol version to use for encode and decode + Version int16 + // ThrottleTime contains the duration in milliseconds for which the request + // was throttled due to a quota violation, or zero if the request did not + // violate any quota. + ThrottleTime time.Duration + // ErrorCode contains the top level response error code. + ErrorCode int16 + // SessionID contains the fetch session ID, or 0 if this is not part of a fetch session. + SessionID int32 + // Blocks contains the response topics. + Blocks map[string]map[int32]*FetchResponseBlock + LogAppendTime bool Timestamp time.Time }