Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consuming a compacted topic fails with ErrIncompleteResponse #1005

Closed
SamiHiltunen opened this issue Dec 15, 2017 · 2 comments · Fixed by #1006
Closed

Consuming a compacted topic fails with ErrIncompleteResponse #1005

SamiHiltunen opened this issue Dec 15, 2017 · 2 comments · Fixed by #1006
Labels

Comments

@SamiHiltunen
Copy link
Contributor

SamiHiltunen commented Dec 15, 2017

Versions

Sarama Version: v1.15.0 - Bump changelog - 3b1b388
Kafka Version: 1.0.0
Go Version: 1.9.2

Configuration

Sarama:

cfg.Version = sarama.V1_0_0_0

Kafka itself has nothing noteworthy. The topic I am consuming is log compacted, which seems to be the critical thing here.

Logs

I couldn't find anything related in Kafka logs.

Offsets in compacted-topic:

38
40
42
43
45

Logs from my example:

go run main.go --kafka broker0:9092 --topic compacted-topic --partition 16 --offset 41
SARAMA 2017/12/15 11:12:44 Initializing new client
SARAMA 2017/12/15 11:12:44 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
SARAMA 2017/12/15 11:12:44 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
SARAMA 2017/12/15 11:12:44 client/metadata fetching metadata for all topics from broker broker0:9092
SARAMA 2017/12/15 11:12:44 Connected to broker at broker0:9092 (unregistered)
SARAMA 2017/12/15 11:12:44 client/brokers registered new broker #2 at broker2:9092
SARAMA 2017/12/15 11:12:44 client/brokers registered new broker #4 at broker4:9092
SARAMA 2017/12/15 11:12:44 client/brokers registered new broker #1 at broker1:9092
SARAMA 2017/12/15 11:12:44 client/brokers registered new broker #3 at broker3:9092
SARAMA 2017/12/15 11:12:44 client/brokers registered new broker #0 at broker0:9092
SARAMA 2017/12/15 11:12:44 Successfully initialized new client
SARAMA 2017/12/15 11:12:44 ClientID is the default of 'sarama', you should consider setting it to something application-specific.
SARAMA 2017/12/15 11:12:44 Connected to broker at broker2:9092 (registered as #2)
SARAMA 2017/12/15 11:12:45 consumer/broker/2 added subscription to compacted-topic/16
2017/12/15 11:12:45 
FETCH:
	compacted-topic/16/41
2017/12/15 11:12:45 partitionConsumer[16].chooseStartingOffset: offset: 41
sarama.partitionConsumer[compacted-topic/16].parseRecords: prelude: true, batch.FirstOffset: 39, rec.OffsetDelta: 1, offset: 40, child.offset: 41
sarama.partitionConsumer[compacted-topic/16].parseRecords: incomplete: false, len(messages): 0, batch: &{FirstOffset:39 PartitionLeaderEpoch:0 Version:2 Codec:2 Control:false LastOffsetDelta:2 FirstTimestamp:2017-12-13 10:19:18.805 +0100 CET MaxTimestamp:2017-12-13 10:19:18.805 +0100 CET ProducerID:-1 ProducerEpoch:-1 FirstSequence:-1 Records:[0xc4200aed20] PartialTrailingRecord:false compressedRecords:[] recordsLen:5343}

SARAMA 2017/12/15 11:12:45 kafka: error while consuming compacted-topic/16: kafka: response did not contain all the expected topic/partition blocks
SARAMA 2017/12/15 11:12:45 consumer/broker/2 abandoned subscription to compacted-topic/16 because kafka: response did not contain all the expected topic/partition blocks
SARAMA 2017/12/15 11:12:47 consumer/compacted-topic/16 finding new broker
SARAMA 2017/12/15 11:12:47 client/metadata fetching metadata for [compacted-topic] from broker broker0:9092
SARAMA 2017/12/15 11:12:47 consumer/broker/2 added subscription to compacted-topic/16
2017/12/15 11:12:47 
FETCH:
	compacted-topic/16/41
SARAMA 2017/12/15 11:12:47 kafka: error while consuming compacted-topic/16: kafka: response did not contain all the expected topic/partition blocks
SARAMA 2017/12/15 11:12:47 consumer/broker/2 abandoned subscription to compacted-topic/16 because kafka: response did not contain all the expected topic/partition blocks
^Csignal: interrupt

Problem Description

Sarama partitionConsumer fails with a ErrIncompleteResponse in parseRecords. This happens when the consumer requests an offset that has been compacted away. As far as I can see, Sarama doesn't advance the requested offset but rather keeps requesting the compacted offset and keeps failing. I guess the expected behaviour would be to ask for the next available offset which in the example is offset 42.

Example

package main

import (
	"log"
	"os"

	"github.com/Shopify/sarama"
	"github.com/spf13/pflag"
)

var (
	kafka     = pflag.StringSlice("kafka", []string{"broker0:9092"}, "kafka address")
	topic     = pflag.String("topic", "compacted-topic", "topic to consume")
	partition = pflag.Int32("partition", 16, "partition to consume")
	offset    = pflag.Int64("offset", 41, "starting offset")
)

func init() {
	pflag.Parse()

}

func main() {
	cfg := sarama.NewConfig()
	cfg.Version = sarama.V1_0_0_0

	sarama.Logger = log.New(os.Stdout, "SARAMA ", log.LstdFlags)

	client, err := sarama.NewClient(*kafka, cfg)
	if err != nil {
		log.Fatalf("error creating client: %v", err)
	}

	consumer, err := sarama.NewConsumerFromClient(client)
	if err != nil {
		log.Fatalf("error creating consumer: %v", err)
	}

	pc, err := consumer.ConsumePartition(*topic, *partition, *offset)
	if err != nil {
		log.Fatalf("error creating partition consumer: %v", err)
	}

	for msg := range pc.Messages() {
		log.Printf("received msg offset: %v", msg.Offset)
	}
}
@eapache
Copy link
Contributor

eapache commented Dec 15, 2017

The code should handle this already. It's not clear to me why https://github.com/SamiHiltunen/sarama/blob/3f78fbb6f7e5d4698239891bfe95af04ec7f9bc5/consumer.go#L499 is not getting hit for offset 42 and advancing it anyway? Or has kafka started returning 0 messages when you request a compacted offset? Historically, it's always just returned the next message (no matter how many offsets were compacted in the middle) so this logic was working.

@SamiHiltunen
Copy link
Contributor Author

SamiHiltunen commented Dec 15, 2017

This seems to be different in the new protocol version introduced in 0.11.0.0. When I run the same example against our older broker running 0.10.1.0, it works as expected. When Sarama fetches offset 41, it receives messages from 42 upwards. In 1.0.0, it seems that when you request 41, you get some amount of messages before 41, (ie 32-40), but the FirstOffset + LastOffsetDelta will sum up to 41, indicating that offset 41 was considered but did not return a record as it was compacted.

#1006 contains a small change that seems to work in my test.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants