Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka batch reader #515

Merged
merged 9 commits into from
Jan 29, 2021
Merged

Kafka batch reader #515

merged 9 commits into from
Jan 29, 2021

Conversation

BenPope
Copy link
Member

@BenPope BenPope commented Jan 27, 2021

This PR replaces #464

kafka::fetch_response::partition_response::record_set contains optional<record_batch[]>, which deserves a type.

This utility supports:

  • empty
  • size_bytes
  • last_offset - last offset of last batch
  • consume_batch - as a record_batch_adaptor
  • record_batch_reader::impl implementation
  • release() - release the underlying iobuf

Major differences since #464:

  • Renamed consumer_records to batch_reader
  • Renamed member consume_record_batch to consume_batch
  • Renamed member _record_set to _buf
  • Support in request_reader and response_writer
  • Maintain nullable semantics by inverting the optionality
  • Removed logging - record_batch adaptor already logs
  • Reordered the patchset as it's a much closer drop-in replacement

Major differences after force push:

  • Rebased
  • Introduce an exception hierarchy
  • Improved error handling with exceptions
  • Tests for failure modes

Checklist

When referencing a related issue, remember to migrate duplicate stories from the
external tracker. This is not relevant for most users.

@BenPope BenPope force-pushed the kafka-batch-reader branch 2 times, most recently from f49cf52 to 5be75ae Compare January 27, 2021 00:45
struct record_batch_info {
model::offset base_offset{};
int32_t record_count{};
int32_t size_bytes{};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this 0? why not set it to 0. what's the default initializer of an int.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i know sizet / unsigned are specified. are ints well specified in every compiler?

Copy link
Member Author

@BenPope BenPope Jan 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is zero initialization via value initialization via default member initialization.

Pretty sure it's 0 for all integral types on all compilers that accept this as valid syntax.

Happy make it explicit for the benefit of doubt.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like a good candidate for using a constructor if the whole object is always initialized

/// model::record_batch on the wire
///
/// The array bound is not serialized, c.f. array<kafka::thing>
class batch_reader final : public model::record_batch_reader::impl {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks really good.

Copy link
Member

@dotnwat dotnwat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good. i need to review again but i left a comment for discussion about the batch reader interface. it's not a lot of code so i'm not sure we need to spend a lot time shooting for the most generic interface, but it might be pretty easy to do because the pattern discussed is already available for copying from storage.

return record_batch_info{
.base_offset = base_offset,
.record_count = record_count,
.size_bytes = size_bytes};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

code here looks fine to me.

discussion: if this is all being done for speed, it might be even faster to index directly into the linearized bytes. and grab the three values. there is also a helpful little thing in the tree called PERF_TEST you could use to see if this code above saves any cycles compared to decoding the entire header.

src/v/kafka/protocol/batch_reader.cc Show resolved Hide resolved
auto crs = kafka::batch_reader(std::move(mem_res.data));

model::offset last_offset{};
while (!crs.empty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe check that the crc is not empty at the beginning?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It really should have 40 batches in it to start with. If it is empty, the latter test for last_offset should definitely fail.

Copy link
Member

@dotnwat dotnwat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks great. minor feedback about a couple cosmetic things, and i think we need a test for the error cases of the record batch reader interface.

i also don't see the record batch reader interface being used other than in a test (am i missing something there?). if it isn't being used are you planning to use it later?

finally did you test some existing clients to make sure everything seems ok for other kafka clients?

struct record_batch_info {
model::offset base_offset{};
int32_t record_count{};
int32_t size_bytes{};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems like a good candidate for using a constructor if the whole object is always initialized

return size_bytes - model::packed_record_batch_header_size;
}
model::offset last_offset() {
return base_offset + model::offset{record_count - 1};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is also the lastOffsetDelta field in the header that could be used here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I had thought it was int64_t.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i was only suggesting that the header has a field specifically designed (i think!) to be added to base_offset to give the last offset. but importantly, i don't think that the two calculations are always the same after compaction runs. we should verify this though.

src/v/kafka/protocol/batch_reader.cc Show resolved Hide resolved
src/v/kafka/protocol/batch_reader.cc Show resolved Hide resolved
if (likely(kba.v2_format && kba.valid_crc && kba.batch)) {
batches.push_back(std::move(*kba.batch));
} else {
_do_load_slice_failed = true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clear batches here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was tempted, and then I can drop the _do_load_slice_failed member, WDYT?

Copy link
Member

@dotnwat dotnwat Jan 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kinda like _do_load_slice_failed because it makes the termination condition clear but i don't have a strong opinion. Would it make sense to clear batches and immediately return an exceptional future? I'm not sure how you were planning on communicating an error to a consumer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about it as a better signalling mechanism, but a quick search through the codebase didn't reveal any other readers that do that, and I wasn't sure what was idiomatic.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think log reader is the only record batch reader that has an opportunity to fail. Here is where it returns an exceptional future from do_load_slice

https://github.com/vectorizedio/redpanda/blob/dev/src/v/storage/log_reader.cc#L297

src/v/kafka/protocol/tests/batch_reader_test.cc Outdated Show resolved Hide resolved
src/v/kafka/protocol/response_writer.h Show resolved Hide resolved
@BenPope
Copy link
Member Author

BenPope commented Jan 27, 2021

i also don't see the record batch reader interface being used other than in a test (am i missing something there?). if it isn't being used are you planning to use it later?

I think it will be useful here: https://github.com/vectorizedio/redpanda/pull/457/files#r563178111

Or if that link doesn't work: https://github.com/vectorizedio/redpanda/pull/457/files#diff-a918eb17ad60e95fda4bd6a66bcfc19ae0244a5ad9ead86732abc22c83e079afR207

And I have plans to use it for json serialization in fetch.

finally did you test some existing clients to make sure everything seems ok for other kafka clients?

No, but I probably should.

@dotnwat
Copy link
Member

dotnwat commented Jan 28, 2021

i also don't see the record batch reader interface being used other than in a test (am i missing something there?). if it isn't being used are you planning to use it later?

I think it will be useful here: https://github.com/vectorizedio/redpanda/pull/457/files#r563178111

Or if that link doesn't work: https://github.com/vectorizedio/redpanda/pull/457/files#diff-a918eb17ad60e95fda4bd6a66bcfc19ae0244a5ad9ead86732abc22c83e079afR207

Ah, nice. Makes sense

Copy link
Contributor

@graphcareful graphcareful left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM super nice impl

last_offset = kba.batch->last_offset();
}

BOOST_REQUIRE_EQUAL(last_offset, expected_last_offset);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it would also be useful to assert the number of bytes processed/read against the expected value

kafka::fetch_response::partition_response::record_set
contains multiple record_batch, which deserves a type.

This utility currently supports:

* empty()
* size_bytes()
* release() - Release the underlying iobuf

TODO:
* last_offset - last offset of last batch
* consume_batch - as a kafka_batch_adaptor
* implement record_batch_reader::impl

Signed-off-by: Ben Pope <[email protected]>
@BenPope BenPope force-pushed the kafka-batch-reader branch from 5be75ae to 916a6a4 Compare January 28, 2021 21:39
@BenPope BenPope requested review from dotnwat and emaxerrno January 28, 2021 22:41
Copy link
Member

@dotnwat dotnwat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this patch looks great, nice work! it's ready to merge pending the question about if (unlikely(record_count - 1 != last_offset_delta))

// they build the types bottoms up, not top down
+ sizeof(base_offset) + sizeof(batch_length);

if (unlikely(record_count - 1 != last_offset_delta)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you verify that this property always holds? i'm not sure that it does, but this might be relevant to the investigation IBM/sarama#1006

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was meant to remove that - it was initially there to check the implementation, and that if my understanding was correct. It makes sense that it may not hold for compacted topics. I'll remove it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plz use --keep-base on the next rebase fixup :)

Copy link
Member Author

@BenPope BenPope Jan 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't usually move the base, I was somewhat forced into it (twice) :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh yeh i was just thinking that the next force push might be like a 3 line change

sizeof(int8_t); // magic
static constexpr size_t lod_offset = crc_offset + //
sizeof(int32_t) + // crc
sizeof(int16_t); // attr
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

"Invalid kafka record parsing: {}",
!kba.v2_format ? "not v2_format"
: !kba.valid_crc ? "invalid crc"
: "empty batch")));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks great

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I got the correct order - it makes sense that magic is checked before crc, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does the order affect anything other than what message gets printed? seems like you could also do like "crc {} magic {} empty {}", crc, magic, empty or something along those lines?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just the printed message, but I like your version more.

I considered adding an enum (or flags) for failure mode, and an exception type to hold it - which I could check it in the test. But I thought the tests would be the only place it would be used.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, so valid_crc isn't set if magic != 2

runtime error: load of value 27, which is not a valid value for type 'const bool'

I'll leave it as it was.

model::consume_reader_to_memory(std::move(rdr), model::no_timeout).get(),
kafka::exception,
[](const kafka::exception& e) {
return e.error == kafka::error_code::corrupt_message;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these failure test cases are really nice

Obtain the last offset of the last batch

Note: This relies on kafka_batch_adaptor which currently lives in v_kafka

Signed-off-by: Ben Pope <[email protected]>
Consume a single record_batch as a kafka_batch_adaptor

Signed-off-by: Ben Pope <[email protected]>
Replaces fetch_response::partition_response::record_set

Signed-off-by: Ben Pope <[email protected]>
Now that obtaining the last offset isn't destructive,
test that all produced records are consumed.

Signed-off-by: Ben Pope <[email protected]>
Drive-by fix: Remove some dead code.

Signed-off-by: Ben Pope <[email protected]>
@BenPope BenPope force-pushed the kafka-batch-reader branch from 916a6a4 to 87771a3 Compare January 29, 2021 00:59
@BenPope BenPope requested a review from dotnwat January 29, 2021 01:02
@BenPope BenPope mentioned this pull request Jan 29, 2021
2 tasks
@dotnwat dotnwat merged commit d7e1dcb into redpanda-data:dev Jan 29, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants