From f59db4906b40f7bee4e4a7a80824e43c55cecabb Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Fri, 17 Jan 2025 01:01:12 +0800 Subject: [PATCH] AVRO-4111: [C++] Replace boost::iostreams with zlib library (#3290) * AVRO-4111: [C++] Replace boost::iostreams with zlib library * declare buf as uint8_t * fix lint * remove unused cmake variables --- .github/workflows/test-lang-c++-ARM.yml | 2 +- .github/workflows/test-lang-c++.yml | 2 +- lang/c++/CMakeLists.txt | 17 ++- lang/c++/impl/DataFile.cc | 160 +++++++++++++++--------- lang/c++/include/avro/DataFile.hh | 3 - 5 files changed, 114 insertions(+), 70 deletions(-) diff --git a/.github/workflows/test-lang-c++-ARM.yml b/.github/workflows/test-lang-c++-ARM.yml index f101eaeb2b5..759065b0894 100644 --- a/.github/workflows/test-lang-c++-ARM.yml +++ b/.github/workflows/test-lang-c++-ARM.yml @@ -44,7 +44,7 @@ jobs: - name: Install dependencies run: | sudo apt-get update -q - sudo apt-get install -q -y gcc g++ libboost-all-dev libfmt-dev cmake + sudo apt-get install -q -y gcc g++ libboost-all-dev libfmt-dev zlib1g-dev cmake - name: Build run: | diff --git a/.github/workflows/test-lang-c++.yml b/.github/workflows/test-lang-c++.yml index 61afa7ff61c..c0c66ceec1e 100644 --- a/.github/workflows/test-lang-c++.yml +++ b/.github/workflows/test-lang-c++.yml @@ -39,7 +39,7 @@ jobs: - uses: actions/checkout@v4 - name: Install Dependencies - run: sudo apt update && sudo apt-get install -qqy cppcheck libboost-all-dev libsnappy-dev libfmt-dev cmake + run: sudo apt update && sudo apt-get install -qqy cppcheck libboost-all-dev libsnappy-dev libfmt-dev zlib1g-dev cmake - name: Print Versions run: | diff --git a/lang/c++/CMakeLists.txt b/lang/c++/CMakeLists.txt index e6f70bffd0e..1fc19f90a2a 100644 --- a/lang/c++/CMakeLists.txt +++ b/lang/c++/CMakeLists.txt @@ -110,6 +110,13 @@ else (SNAPPY_FOUND) message("Disabled snappy codec. libsnappy not found.") endif (SNAPPY_FOUND) +find_package(ZLIB REQUIRED) +if (ZLIB_FOUND) + message("Enabled zlib codec") +else (ZLIB_FOUND) + message(FATAL_ERROR "ZLIB is not found") +endif (ZLIB_FOUND) + add_definitions (${Boost_LIB_DIAGNOSTIC_DEFINITIONS}) add_definitions (-DAVRO_VERSION="${AVRO_VERSION_MAJOR}.${AVRO_VERSION_MINOR}.${AVRO_VERSION_PATCH}") @@ -140,8 +147,8 @@ set_property (TARGET avrocpp APPEND PROPERTY COMPILE_DEFINITIONS AVRO_DYN_LINK) add_library (avrocpp_s STATIC ${AVRO_SOURCE_FILES}) -target_include_directories(avrocpp_s PRIVATE ${SNAPPY_INCLUDE_DIR}) -target_link_libraries(avrocpp_s ${Boost_LIBRARIES} ${SNAPPY_LIBRARIES} fmt::fmt-header-only) +target_include_directories(avrocpp_s PRIVATE ${SNAPPY_INCLUDE_DIR} ${ZLIB_INCLUDE_DIR}) +target_link_libraries(avrocpp_s ${Boost_LIBRARIES} ${SNAPPY_LIBRARIES} ${ZLIB_LIBRARIES} fmt::fmt-header-only) set_property (TARGET avrocpp avrocpp_s APPEND PROPERTY COMPILE_DEFINITIONS AVRO_SOURCE) @@ -152,8 +159,8 @@ set_target_properties (avrocpp PROPERTIES set_target_properties (avrocpp_s PROPERTIES VERSION ${AVRO_VERSION_MAJOR}.${AVRO_VERSION_MINOR}.${AVRO_VERSION_PATCH}) -target_link_libraries (avrocpp ${Boost_LIBRARIES} ${SNAPPY_LIBRARIES} fmt::fmt-header-only) -target_include_directories(avrocpp PRIVATE ${SNAPPY_INCLUDE_DIR}) +target_link_libraries (avrocpp ${Boost_LIBRARIES} ${SNAPPY_LIBRARIES} ${ZLIB_LIBRARIES} fmt::fmt-header-only) +target_include_directories(avrocpp PRIVATE ${SNAPPY_INCLUDE_DIR} ${ZLIB_INCLUDE_DIR}) target_include_directories(avrocpp PUBLIC $ @@ -209,7 +216,7 @@ if (AVRO_BUILD_TESTS) macro (unittest name) add_executable (${name} test/${name}.cc) - target_link_libraries (${name} avrocpp_s ${Boost_LIBRARIES} ${SNAPPY_LIBRARIES}) + target_link_libraries (${name} avrocpp_s ${Boost_LIBRARIES} ${SNAPPY_LIBRARIES} ${ZLIB_LIBRARIES}) add_test (NAME ${name} WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} COMMAND ${CMAKE_CURRENT_BINARY_DIR}/${name}) endmacro (unittest) diff --git a/lang/c++/impl/DataFile.cc b/lang/c++/impl/DataFile.cc index 63ea7df2083..fb0eaa2ac84 100644 --- a/lang/c++/impl/DataFile.cc +++ b/lang/c++/impl/DataFile.cc @@ -23,15 +23,12 @@ #include #include -#include // for boost::crc_32_type -#include -#include -#include - #ifdef SNAPPY_CODEC_AVAILABLE #include #endif +#include + namespace avro { using std::copy; using std::istringstream; @@ -55,12 +52,8 @@ const string AVRO_SNAPPY_CODEC = "snappy"; const size_t minSyncInterval = 32; const size_t maxSyncInterval = 1u << 30; -boost::iostreams::zlib_params get_zlib_params() { - boost::iostreams::zlib_params ret; - ret.method = boost::iostreams::zlib::deflated; - ret.noheader = true; - return ret; -} +// Recommended by https://www.zlib.net/zlib_how.html +const size_t zlibBufGrowSize = 128 * 1024; } // namespace @@ -144,21 +137,45 @@ void DataFileWriterBase::sync() { std::unique_ptr in = memoryInputStream(*buffer_); copy(*in, *stream_); } else if (codec_ == DEFLATE_CODEC) { - std::vector buf; + std::vector buf; { - boost::iostreams::filtering_ostream os; - os.push(boost::iostreams::zlib_compressor(get_zlib_params())); - os.push(boost::iostreams::back_inserter(buf)); - const uint8_t *data; - size_t len; + z_stream zs; + zs.zalloc = Z_NULL; + zs.zfree = Z_NULL; + zs.opaque = Z_NULL; + + int ret = deflateInit2(&zs, Z_DEFAULT_COMPRESSION, Z_DEFLATED, -15, 8, Z_DEFAULT_STRATEGY); + if (ret != Z_OK) { + throw Exception("Failed to initialize deflate, error: {}", ret); + } std::unique_ptr input = memoryInputStream(*buffer_); - while (input->next(&data, &len)) { - boost::iostreams::write(os, reinterpret_cast(data), len); + const uint8_t *data; + size_t len; + while (ret != Z_STREAM_END && input->next(&data, &len)) { + zs.avail_in = static_cast(len); + zs.next_in = const_cast(data); + bool flush = (zs.total_in + len) >= buffer_->byteCount(); + do { + if (zs.total_out == buf.size()) { + buf.resize(buf.size() + zlibBufGrowSize); + } + zs.avail_out = static_cast(buf.size() - zs.total_out); + zs.next_out = buf.data() + zs.total_out; + ret = deflate(&zs, flush ? Z_FINISH : Z_NO_FLUSH); + if (ret == Z_STREAM_END) { + break; + } + if (ret != Z_OK) { + throw Exception("Failed to deflate, error: {}", ret); + } + } while (zs.avail_out == 0); } + + buf.resize(zs.total_out); + (void) deflateEnd(&zs); } // make sure all is flushed - std::unique_ptr in = memoryInputStream( - reinterpret_cast(buf.data()), buf.size()); + std::unique_ptr in = memoryInputStream(buf.data(), buf.size()); int64_t byteCount = buf.size(); avro::encode(*encoderPtr_, byteCount); encoderPtr_->flush(); @@ -167,35 +184,28 @@ void DataFileWriterBase::sync() { } else if (codec_ == SNAPPY_CODEC) { std::vector temp; std::string compressed; - boost::crc_32_type crc; - { - boost::iostreams::filtering_ostream os; - os.push(boost::iostreams::back_inserter(temp)); - const uint8_t *data; - size_t len; - std::unique_ptr input = memoryInputStream(*buffer_); - while (input->next(&data, &len)) { - boost::iostreams::write(os, reinterpret_cast(data), - len); - } - } // make sure all is flushed + const uint8_t *data; + size_t len; + std::unique_ptr input = memoryInputStream(*buffer_); + while (input->next(&data, &len)) { + temp.insert(temp.end(), reinterpret_cast(data), + reinterpret_cast(data) + len); + } - crc.process_bytes(reinterpret_cast(temp.data()), - temp.size()); // For Snappy, add the CRC32 checksum - auto checksum = crc(); + auto checksum = crc32(0, reinterpret_cast(temp.data()), + static_cast(temp.size())); // Now compress size_t compressed_size = snappy::Compress( reinterpret_cast(temp.data()), temp.size(), &compressed); + temp.clear(); - { - boost::iostreams::filtering_ostream os; - os.push(boost::iostreams::back_inserter(temp)); - boost::iostreams::write(os, compressed.c_str(), compressed_size); - } + temp.insert(temp.end(), compressed.c_str(), + compressed.c_str() + compressed_size); + temp.push_back(static_cast((checksum >> 24) & 0xFF)); temp.push_back(static_cast((checksum >> 16) & 0xFF)); temp.push_back(static_cast((checksum >> 8) & 0xFF)); @@ -285,8 +295,7 @@ void DataFileReaderBase::init(const ValidSchema &readerSchema) { static void drain(InputStream &in) { const uint8_t *p = nullptr; size_t n = 0; - while (in.next(&p, &n)) - ; + while (in.next(&p, &n)); } char hex(unsigned int x) { @@ -384,7 +393,6 @@ void DataFileReaderBase::readDataBlock() { dataStream_ = std::move(st); #ifdef SNAPPY_CODEC_AVAILABLE } else if (codec_ == SNAPPY_CODEC) { - boost::crc_32_type crc; uint32_t checksum = 0; compressed_.clear(); uncompressed.clear(); @@ -408,35 +416,67 @@ void DataFileReaderBase::readDataBlock() { throw Exception( "Snappy Compression reported an error when decompressing"); } - crc.process_bytes(uncompressed.c_str(), uncompressed.size()); - auto c = crc(); + auto c = crc32(0, reinterpret_cast(uncompressed.c_str()), + static_cast(uncompressed.size())); if (checksum != c) { throw Exception( "Checksum did not match for Snappy compression: Expected: {}, computed: {}", checksum, c); } - os_.reset(new boost::iostreams::filtering_istream()); - os_->push( - boost::iostreams::basic_array_source(uncompressed.c_str(), - uncompressed.size())); - std::unique_ptr in = istreamInputStream(*os_); + + std::unique_ptr in = memoryInputStream( + reinterpret_cast(uncompressed.c_str()), + uncompressed.size()); dataDecoder_->init(*in); dataStream_ = std::move(in); #endif } else { compressed_.clear(); - const uint8_t *data; - size_t len; - while (st->next(&data, &len)) { - compressed_.insert(compressed_.end(), data, data + len); + uncompressed.clear(); + + { + z_stream zs; + zs.zalloc = Z_NULL; + zs.zfree = Z_NULL; + zs.opaque = Z_NULL; + zs.avail_in = 0; + zs.next_in = Z_NULL; + + int ret = inflateInit2(&zs, /*windowBits=*/-15); + if (ret != Z_OK) { + throw Exception("Failed to initialize inflate, error: {}", ret); + } + + const uint8_t *data; + size_t len; + while (ret != Z_STREAM_END && st->next(&data, &len)) { + zs.avail_in = static_cast(len); + zs.next_in = const_cast(data); + do { + if (zs.total_out == uncompressed.size()) { + uncompressed.resize(uncompressed.size() + zlibBufGrowSize); + } + zs.avail_out = static_cast(uncompressed.size() - zs.total_out); + zs.next_out = reinterpret_cast(uncompressed.data() + zs.total_out); + ret = inflate(&zs, Z_NO_FLUSH); + if (ret == Z_STREAM_END) { + break; + } + if (ret != Z_OK) { + throw Exception("Failed to inflate, error: {}", ret); + } + } while (zs.avail_out == 0); + } + + uncompressed.resize(zs.total_out); + (void) inflateEnd(&zs); } - os_.reset(new boost::iostreams::filtering_istream()); - os_->push(boost::iostreams::zlib_decompressor(get_zlib_params())); - os_->push(boost::iostreams::basic_array_source( - compressed_.data(), compressed_.size())); - std::unique_ptr in = nonSeekableIstreamInputStream(*os_); + std::unique_ptr in = memoryInputStream( + reinterpret_cast(uncompressed.c_str()), + uncompressed.size()); + dataDecoder_->init(*in); dataStream_ = std::move(in); } diff --git a/lang/c++/include/avro/DataFile.hh b/lang/c++/include/avro/DataFile.hh index dcfddf774ff..6b2cdab5c19 100644 --- a/lang/c++/include/avro/DataFile.hh +++ b/lang/c++/include/avro/DataFile.hh @@ -31,8 +31,6 @@ #include #include -#include - namespace avro { /** Specify type of compression to use when writing data files. */ @@ -216,7 +214,6 @@ class AVRO_DECL DataFileReaderBase { DataFileSync sync_{}; // for compressed buffer - std::unique_ptr os_; std::vector compressed_; std::string uncompressed; void readHeader();