Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[C++] ReadNext in arrow::RecordBatchReader returns invalid status on second or subsequent items #41339

Closed
gbronner opened this issue Apr 22, 2024 · 4 comments

Comments

@gbronner
Copy link

gbronner commented Apr 22, 2024

Describe the bug, including details regarding any error messages, version, and platform.

I have some code that is trying to iterate through record batches of fairly large parquet files

The code is

  std::shared_ptr<arrow::RecordBatch> *curBatch=&m_curBatch;
  auto status=m_reader->ReadNext(curBatch);

and the stack trace is

<signal handler>
#7  <signal handler called>
#8  0x00007fb7a284f5cd in parquet::internal::(anonymous namespace)::TypedRecordReader<parquet::PhysicalType<(parquet::Type::type)7> >::bytes_for_values(long) const [clone .isra.1197] () from /lib64/libparquet.so.1500
#9  0x00007fb7a28512bd in parquet::internal::(anonymous namespace)::TypedRecordReader<parquet::PhysicalType<(parquet::Type::type)5> >::ReserveValues(long) () from /lib64/libparquet.so.1500
#10 0x00007fb7a27e7b98 in parquet::arrow::(anonymous namespace)::LeafReader::LoadBatch(long) () from /lib64/libparquet.so.1500
#11 0x00007fb7a27f57d8 in parquet::arrow::ColumnReaderImpl::NextBatch(long, std::shared_ptr<arrow::ChunkedArray>*) () from /lib64/libparquet.so.1500
#12 0x00007fb7a27eeec0 in arrow::Result<arrow::Iterator<std::shared_ptr<arrow::RecordBatch> > > arrow::Iterator<arrow::Iterator<std::shared_ptr<arrow::RecordBatch> > >::Next<arrow::FunctionIterator<parquet::arrow::(anonymous namespace)::FileReaderImpl::GetRecordBatchReader(std::vector<int, std::allocator<int> > const&, std::vector<int, std::allocator<int> > const&, std::unique_ptr<arrow::RecordBatchReader, std::default_delete<arrow::RecordBatchReader> >*)::{lambda()#1}, arrow::Iterator<std::shared_ptr<arrow::RecordBatch> > > >(void*) () from /lib64/libparquet.so.1500
#13 0x00007fb7a27fa3ea in arrow::FlattenIterator<std::shared_ptr<arrow::RecordBatch> >::Next() () from /lib64/libparquet.so.1500
#14 0x00007fb7a27fa4aa in arrow::FlattenIterator<std::shared_ptr<arrow::RecordBatch> >::Next() () from /lib64/libparquet.so.1500
#15 0x00007fb7a27fa581 in arrow::Result<std::shared_ptr<arrow::RecordBatch> > arrow::Iterator<std::shared_ptr<arrow::RecordBatch> >::Next<arrow::FlattenIterator<std::shared_ptr<arrow::RecordBatch> > >(void*) () from /lib64/libparquet.so.1500
#16 0x00007fb7a27e7ac2 in parquet::arrow::(anonymous namespace)::RowGroupRecordBatchReader::ReadNext(std::shared_ptr<arrow::RecordBatch>*) () from /lib64/libparquet.so.1500


I'm a bit at a loss for why this would happen.
I've also seen some references to
Invalid: Buffer #1 too small in array of type int64 and length 3: expected at least 24 byte(s), got 0 when working with extremely wide parquet files.

The Parquet file is fine -- I can read it with ReadTable, pyarrow, etc. It even works if the batch size is sufficiently large to read it in one batch.

Any ideas as to why it would run out of buffers even if I'm only reading batch sizes of 3?

Component(s)

C++

@kou kou changed the title ReadNext in arrow::RecordBatchReader returns invalid status on second or subsequent items [C++] ReadNext in arrow::RecordBatchReader returns invalid status on second or subsequent items Apr 23, 2024
@mapleFU
Copy link
Member

mapleFU commented Apr 23, 2024

Would you mind upload the parquet file? I cannot debug quickly without the file...

@gbronner
Copy link
Author

test.parquet.gz

here it is. gzipped b/c you can't upload a parquet file raw.

@mapleFU
Copy link
Member

mapleFU commented Apr 24, 2024

Aha, I use master code and run ReadInBatches in cpp/examples/arrow/parquet_read_write's ReadInBatches:

arrow::Status ReadInBatches(std::string path_to_file) {
  // #include "arrow/io/api.h"
  // #include "arrow/parquet/arrow/reader.h"

  arrow::MemoryPool* pool = arrow::default_memory_pool();

  // Configure general Parquet reader settings
  auto reader_properties = parquet::ReaderProperties(pool);
  reader_properties.set_buffer_size(4096 * 4);
  reader_properties.enable_buffered_stream();

  // Configure Arrow-specific Parquet reader settings
  auto arrow_reader_props = parquet::ArrowReaderProperties();
  arrow_reader_props.set_batch_size(3);  // default 64 * 1024
  arrow_reader_props.set_use_threads(true);

  parquet::arrow::FileReaderBuilder reader_builder;
  ARROW_RETURN_NOT_OK(
      reader_builder.OpenFile(path_to_file, /*memory_map=*/true, reader_properties));
  reader_builder.memory_pool(pool);
  reader_builder.properties(arrow_reader_props);

  std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
  ARROW_ASSIGN_OR_RAISE(arrow_reader, reader_builder.Build());

  std::shared_ptr<::arrow::RecordBatchReader> rb_reader;
  ARROW_RETURN_NOT_OK(arrow_reader->GetRecordBatchReader(&rb_reader));

  std::shared_ptr<::arrow::RecordBatch> batch;
  while (rb_reader->ReadNext(&batch).ok() && batch != nullptr) {
    std::cout << "Read:" << batch->ToString() << '\n';
  }

  // for (arrow::Result<std::shared_ptr<arrow::RecordBatch>> maybe_batch : *rb_reader) {
  //   if (!maybe_batch.ok()) {
  //     std::cout << "Error reading batch: " << maybe_batch.status().message() << std::endl;
  //   } else {
  //       std::shared_ptr<arrow::RecordBatch> batch = maybe_batch.ValueOrDie();
  //       std::cout << "Read batch with " << batch->num_rows() << " rows" << std::endl;
  //   }
  // }
  return arrow::Status::OK();
}

arrow::Status RunExamples(std::string path_to_file) {
//  ARROW_RETURN_NOT_OK(WriteFullFile(path_to_file));
//  ARROW_RETURN_NOT_OK(ReadFullFile(path_to_file));
//  ARROW_RETURN_NOT_OK(WriteInBatches(path_to_file));
  ARROW_RETURN_NOT_OK(ReadInBatches(path_to_file));
  return arrow::Status::OK();
}

This doesn't crash. I'm running on My M1 MacOS and master branch. Would you mind provide some configs?

By the way, the stack below it's a little confusing, 🤔 why parquet::PhysicalType<(parquet::Type::type)5> calls parquet::PhysicalType<(parquet::Type::type)7> ...


#8  0x00007fb7a284f5cd in parquet::internal::(anonymous namespace)::TypedRecordReader<parquet::PhysicalType<(parquet::Type::type)7> >::bytes_for_values(long) const [clone .isra.1197] () from /lib64/libparquet.so.1500
#9  0x00007fb7a28512bd in parquet::internal::(anonymous namespace)::TypedRecordReader<parquet::PhysicalType<(parquet::Type::type)5> >::ReserveValues(long) () from /lib64/libparquet.so.1500

@gbronner
Copy link
Author

I think the issue I hit is that the I'm creating the FileReader and the RecordBatchReader in a function. It appears that the record batch reader doesn't grab a reference to its parent arrow::parquet::FileReader, so you have to save that separately as well as the RecordBatchReader

So not a bug, but very hard to figure out.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants