Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support sparse serialization #245

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions clickhouse/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ SET ( clickhouse-cpp-lib-src
columns/nullable.cpp
columns/numeric.cpp
columns/map.cpp
columns/serialization.cpp
columns/string.cpp
columns/tuple.cpp
columns/uuid.cpp
Expand Down Expand Up @@ -120,6 +121,7 @@ INSTALL(FILES columns/lowcardinality.h DESTINATION include/clickhouse/columns/)
INSTALL(FILES columns/nullable.h DESTINATION include/clickhouse/columns/)
INSTALL(FILES columns/numeric.h DESTINATION include/clickhouse/columns/)
INSTALL(FILES columns/map.h DESTINATION include/clickhouse/columns/)
INSTALL(FILES columns/serialization.h DESTINATION include/clickhouse/columns/)
INSTALL(FILES columns/string.h DESTINATION include/clickhouse/columns/)
INSTALL(FILES columns/tuple.h DESTINATION include/clickhouse/columns/)
INSTALL(FILES columns/utils.h DESTINATION include/clickhouse/columns/)
Expand Down
38 changes: 37 additions & 1 deletion clickhouse/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,11 @@
#define DBMS_MIN_REVISION_WITH_DISTRIBUTED_DEPTH 54448
#define DBMS_MIN_REVISION_WITH_INITIAL_QUERY_START_TIME 54449
#define DBMS_MIN_REVISION_WITH_INCREMENTAL_PROFILE_EVENTS 54451
// #define DBMS_MIN_REVISION_WITH_AGGREGATE_FUNCTIONS_VERSIONING 54452
#define DBMS_MIN_REVISION_WITH_PARALLEL_REPLICAS 54453
#define DBMS_MIN_REVISION_WITH_CUSTOM_SERIALIZATION 54454

#define REVISION DBMS_MIN_REVISION_WITH_INCREMENTAL_PROFILE_EVENTS
#define REVISION DBMS_MIN_REVISION_WITH_CUSTOM_SERIALIZATION

namespace clickhouse {

Expand Down Expand Up @@ -552,7 +555,19 @@ bool Client::Impl::ReadBlock(InputStream& input, Block* block) {
return false;
}

uint8_t has_custom_serialization = 0;
if (REVISION >= DBMS_MIN_REVISION_WITH_CUSTOM_SERIALIZATION) {
if (!WireFormat::ReadFixed(input, &has_custom_serialization)) {
return false;
}
}

if (ColumnRef col = CreateColumnByType(type, create_column_settings)) {

if (has_custom_serialization) {
col->LoadSerializationKind(&input);
}

if (num_rows && !col->Load(&input, num_rows)) {
throw ProtocolError("can't load column '" + name + "' of type " + type);
}
Expand Down Expand Up @@ -708,6 +723,16 @@ void Client::Impl::SendQuery(const Query& query) {
throw UnimplementedError(std::string("Can't send open telemetry tracing context to a server, server version is too old"));
}
}

if (server_info_.revision >= DBMS_MIN_REVISION_WITH_PARALLEL_REPLICAS)
{
// collaborate_with_initiator
WireFormat::WriteUInt64 (*output_, 0u);
// count_participating_replicas
WireFormat::WriteUInt64 (*output_, 0u);
// number_of_current_replica
WireFormat::WriteUInt64 (*output_, 0u);
}
}

/// Per query settings
Expand Down Expand Up @@ -757,6 +782,17 @@ void Client::Impl::WriteBlock(const Block& block, OutputStream& output) {
WireFormat::WriteString(output, bi.Name());
WireFormat::WriteString(output, bi.Type()->GetName());

bool has_custom = bi.Column()->HasCustomSerialization();
if (server_info_.revision >= DBMS_MIN_REVISION_WITH_CUSTOM_SERIALIZATION) {
WireFormat::WriteFixed(output, static_cast<uint8_t>(has_custom));
if (has_custom) {
bi.Column()->SaveSerializationKind(&output);
}
} else {
// Current implementation works only for server version >= v22.1.2.2-stable
throw UnimplementedError(std::string("Can't send column with custom serialisation to a server, server version is too old"));
}

// Empty columns are not serialized and occupy exactly 0 bytes.
// ref https://github.com/ClickHouse/ClickHouse/blob/39b37a3240f74f4871c8c1679910e065af6bea19/src/Formats/NativeWriter.cpp#L163
const bool containsData = block.GetRowCount() > 0;
Expand Down
31 changes: 22 additions & 9 deletions clickhouse/columns/array.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ ColumnArray::ColumnArray(ColumnRef data)
}

ColumnArray::ColumnArray(ColumnRef data, std::shared_ptr<ColumnUInt64> offsets)
: Column(Type::CreateArray(data->Type()))
: Column(Type::CreateArray(data->Type()), Serialization::MakeDefault(this))
, data_(data)
, offsets_(offsets)
{
}

ColumnArray::ColumnArray(ColumnArray&& other)
: Column(other.Type())
: Column(other.Type(), Serialization::MakeDefault(this))
, data_(std::move(other.data_))
, offsets_(std::move(other.offsets_))
{
Expand Down Expand Up @@ -73,35 +73,36 @@ bool ColumnArray::LoadPrefix(InputStream* input, size_t rows) {
if (!rows) {
return true;
}

return data_->LoadPrefix(input, rows);
return data_->GetSerialization()->LoadPrefix(data_.get(), input, rows);
}

bool ColumnArray::LoadBody(InputStream* input, size_t rows) {
if (!rows) {
return true;
}
if (!offsets_->LoadBody(input, rows)) {
if (!offsets_->GetSerialization()->LoadBody(offsets_.get(), input, rows)) {
return false;
}

const auto nested_rows = (*offsets_)[rows - 1];
if (nested_rows == 0) {
return true;
}
if (!data_->LoadBody(input, nested_rows)) {
if (!data_->GetSerialization()->LoadBody(data_.get(), input, nested_rows)) {
return false;
}
return true;
}

void ColumnArray::SavePrefix(OutputStream* output) {
data_->SavePrefix(output);
data_->GetSerialization()->SavePrefix(data_.get(), output);
}

void ColumnArray::SaveBody(OutputStream* output) {
offsets_->SaveBody(output);
offsets_->GetSerialization()->SaveBody(offsets_.get(), output);

if (data_->Size() > 0) {
data_->SaveBody(output);
data_->GetSerialization()->SaveBody(data_.get(), output);
}
}

Expand All @@ -120,6 +121,18 @@ void ColumnArray::Swap(Column& other) {
offsets_.swap(col.offsets_);
}

void ColumnArray::SetSerializationKind(Serialization::Kind kind) {
switch (kind)
{
case Serialization::Kind::DEFAULT:
serialization_ = Serialization::MakeDefault(this);
break;
default:
throw UnimplementedError("Serialization kind:" + std::to_string(static_cast<int>(kind))
+ " is not supported for column of " + type_->GetName());
}
}

void ColumnArray::OffsetsIncrease(size_t n) {
offsets_->Append(n);
}
Expand Down
28 changes: 16 additions & 12 deletions clickhouse/columns/array.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,6 @@ class ColumnArray : public Column {
/// Appends content of given column to the end of current one.
void Append(ColumnRef column) override;

/// Loads column prefix from input stream.
bool LoadPrefix(InputStream* input, size_t rows) override;

/// Loads column data from input stream.
bool LoadBody(InputStream* input, size_t rows) override;

/// Saves column prefix to output stream.
void SavePrefix(OutputStream* output) override;

/// Saves column data to output stream.
void SaveBody(OutputStream* output) override;

/// Clear column data .
void Clear() override;

Expand All @@ -73,6 +61,8 @@ class ColumnArray : public Column {
ColumnRef CloneEmpty() const override;
void Swap(Column&) override;

void SetSerializationKind(Serialization::Kind kind) override;

void OffsetsIncrease(size_t);

protected:
Expand All @@ -87,6 +77,20 @@ class ColumnArray : public Column {
void Reset();

private:
/// Loads column prefix from input stream.
bool LoadPrefix(InputStream* input, size_t rows);

/// Loads column data from input stream.
bool LoadBody(InputStream* input, size_t rows);

/// Saves column prefix to output stream.
void SavePrefix(OutputStream* output);

/// Saves column data to output stream.
void SaveBody(OutputStream* output);

friend SerializationDefault<ColumnArray>;

ColumnRef data_;
std::shared_ptr<ColumnUInt64> offsets_;
};
Expand Down
40 changes: 30 additions & 10 deletions clickhouse/columns/column.cpp
Original file line number Diff line number Diff line change
@@ -1,24 +1,44 @@
#include "column.h"

#include "../base/wire_format.h"

namespace clickhouse {

bool Column::LoadPrefix(InputStream*, size_t) {
/// does nothing by default
bool Column::Load(InputStream* input, size_t rows) {
assert(serialization_);
return serialization_->LoadPrefix(this, input, rows)
&& serialization_->LoadBody(this, input, rows);
}

/// Saves column data to output stream.
void Column::Save(OutputStream* output) {
assert(serialization_);
serialization_->SavePrefix(this, output);
serialization_->SaveBody(this,output);
}

bool Column::LoadSerializationKind(InputStream* input) {
uint8_t kind;
if (!WireFormat::ReadFixed(*input, &kind)) {
return false;
}
SetSerializationKind(static_cast<Serialization::Kind>(kind));
return true;
}

bool Column::Load(InputStream* input, size_t rows) {
return LoadPrefix(input, rows) && LoadBody(input, rows);
void Column::SaveSerializationKind(OutputStream* output) {
assert(serialization_);
WireFormat::WriteFixed(*output, static_cast<uint8_t>(serialization_->GetKind()));
}

void Column::SavePrefix(OutputStream*) {
/// does nothing by default
SerializationRef Column::GetSerialization() {
assert(serialization_);
return serialization_;
}

/// Saves column data to output stream.
void Column::Save(OutputStream* output) {
SavePrefix(output);
SaveBody(output);
bool Column::HasCustomSerialization() const {
assert(serialization_);
return serialization_->GetKind() != Serialization::Kind::DEFAULT;
}

}
30 changes: 17 additions & 13 deletions clickhouse/columns/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "../types/types.h"
#include "../columns/itemview.h"
#include "../columns/serialization.h"
#include "../exceptions.h"

#include <memory>
Expand All @@ -19,7 +20,11 @@ using ColumnRef = std::shared_ptr<class Column>;
*/
class Column : public std::enable_shared_from_this<Column> {
public:
explicit inline Column(TypeRef type) : type_(type) {}
explicit inline Column(TypeRef type, SerializationRef serialization)
: type_(std::move(type))
, serialization_(std::move(serialization))
{
}

virtual ~Column() {}

Expand Down Expand Up @@ -56,18 +61,6 @@ class Column : public std::enable_shared_from_this<Column> {
/// Should be called only once from the client. Derived classes should not call it.
bool Load(InputStream* input, size_t rows);

/// Loads column prefix from input stream.
virtual bool LoadPrefix(InputStream* input, size_t rows);

/// Loads column data from input stream.
virtual bool LoadBody(InputStream* input, size_t rows) = 0;

/// Saves column prefix to output stream. Column types with prefixes must implement it.
virtual void SavePrefix(OutputStream* output);

/// Saves column body to output stream.
virtual void SaveBody(OutputStream* output) = 0;

/// Template method to save to output stream. It'll call SavePrefix and SaveBody respectively
/// Should be called only once from the client. Derived classes should not call it.
/// Save is split in Prefix and Body because some data types require prefixes and specific serialization order.
Expand All @@ -93,12 +86,23 @@ class Column : public std::enable_shared_from_this<Column> {
throw UnimplementedError("GetItem() is not supported for column of " + type_->GetName());
}

virtual bool LoadSerializationKind(InputStream* input);

virtual void SaveSerializationKind(OutputStream* output);

virtual void SetSerializationKind(Serialization::Kind kind) = 0;

SerializationRef GetSerialization();

virtual bool HasCustomSerialization() const;

friend void swap(Column& left, Column& right) {
left.Swap(right);
}

protected:
TypeRef type_;
SerializationRef serialization_;
};

} // namespace clickhouse
Loading