Skip to content

Commit

Permalink
Fix the problem of map(low_cardinality) and tuple(low_cardinality) (#51)
Browse files Browse the repository at this point in the history
* Fix the problem of map(low_cardinality) and tuple(low_cardinality)

* revise code structure

* revise code structure

* revise code and fix array(low_cardinality) problem
  • Loading branch information
Jasmine-ge authored Jul 31, 2024
1 parent dba8c94 commit c57d951
Show file tree
Hide file tree
Showing 42 changed files with 403 additions and 192 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@

package com.timeplus.data;

import java.io.IOException;
import java.sql.SQLException;

import com.timeplus.serde.BinarySerializer;

public abstract class AbstractColumn implements IColumn {

protected final String name;
Expand Down Expand Up @@ -63,4 +68,15 @@ public void setColumnWriterBuffer(ColumnWriterBuffer buffer) {
public ColumnWriterBuffer getColumnWriterBuffer() {
return buffer;
}

@Override
public void SerializeBulkPrefix(BinarySerializer serializer) throws IOException, SQLException {

}

@Override
public void SerializeBulkSuffix(BinarySerializer serializer) throws IOException, SQLException {

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ public static Block readFrom(BinaryDeserializer deserializer,
String type = deserializer.readUTF8StringBinary();

IDataType dataType = DataTypeFactory.get(type, serverContext);
dataType.deserializeBinaryPrefix(rowCnt, deserializer);
Object[] arr = dataType.deserializeBinaryBulk(rowCnt, deserializer);
dataType.deserializeBinarySuffix(rowCnt, deserializer);
columns[i] = ColumnFactory.createColumn(name, dataType, arr);
}

Expand Down Expand Up @@ -124,7 +126,11 @@ public void writeTo(BinarySerializer serializer) throws IOException, SQLExceptio
serializer.writeVarInt(rowCnt);

for (IColumn column : columns) {
column.flushToSerializer(serializer, true);
serializer.writeUTF8StringBinary(column.name());
serializer.writeUTF8StringBinary(column.type().name());
column.SerializeBulkPrefix(serializer);
column.SerializeBulk(serializer, true);
column.SerializeBulkSuffix(serializer);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,10 @@ public void write(Object object) throws IOException, SQLException {
}

@Override
public void flushToSerializer(BinarySerializer serializer, boolean now) throws IOException, SQLException {
if (isExported()) {
serializer.writeUTF8StringBinary(name);
serializer.writeUTF8StringBinary(type.name());
}

public void SerializeBulk(BinarySerializer serializer, Boolean now) throws IOException, SQLException {
if (now) {
buffer.writeTo(serializer);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.timeplus.jdbc.TimeplusArray;
import com.timeplus.data.type.complex.DataTypeArray;
import com.timeplus.serde.BinarySerializer;

import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
Expand All @@ -27,12 +26,12 @@ public class ColumnArray extends AbstractColumn {

private final List<Long> offsets;
// data represents nested column in ColumnArray
private final IColumn data;
private final IColumn nestedColumn;

public ColumnArray(String name, DataTypeArray type, Object[] values) {
super(name, type, values);
offsets = new ArrayList<>();
data = ColumnFactory.createColumn(null, type.getElemDataType(), null);
nestedColumn = ColumnFactory.createColumn(null, type.getElemDataType(), null);
}

@Override
Expand All @@ -41,22 +40,7 @@ public void write(Object object) throws IOException, SQLException {

offsets.add(offsets.isEmpty() ? arr.length : offsets.get((offsets.size() - 1)) + arr.length);
for (Object field : arr) {
data.write(field);
}
}

@Override
public void flushToSerializer(BinarySerializer serializer, boolean immediate) throws SQLException, IOException {
if (isExported()) {
serializer.writeUTF8StringBinary(name);
serializer.writeUTF8StringBinary(type.name());
}

flushOffsets(serializer);
data.flushToSerializer(serializer, false);

if (immediate) {
buffer.writeTo(serializer);
nestedColumn.write(field);
}
}

Expand All @@ -69,12 +53,34 @@ public void flushOffsets(BinarySerializer serializer) throws IOException {
@Override
public void setColumnWriterBuffer(ColumnWriterBuffer buffer) {
super.setColumnWriterBuffer(buffer);
data.setColumnWriterBuffer(buffer);
nestedColumn.setColumnWriterBuffer(buffer);
}

@Override
public void clear() {
offsets.clear();
data.clear();
nestedColumn.clear();
}

@Override
public void SerializeBulkPrefix(BinarySerializer serializer) throws SQLException, IOException {
nestedColumn.SerializeBulkPrefix(serializer);
}

@Override
public void SerializeBulk(BinarySerializer serializer, Boolean now) throws IOException, SQLException {
flushOffsets(serializer);
nestedColumn.SerializeBulk(serializer, false);

if (now) {
buffer.writeTo(serializer);
}
}

@Override
public void SerializeBulkSuffix(BinarySerializer serializer) throws SQLException, IOException {
nestedColumn.SerializeBulkSuffix(serializer);
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@

public class ColumnLowCardinality extends AbstractColumn {

private IColumn data;
private final List<Long> indexes;
private final List<Object> dict;
private final Long version = 1L;
private boolean nested_is_nullable;
private IDataType nested_type;

public ColumnLowCardinality(String name, DataTypeLowCardinality type, Object[] values) {
super(name, type, values);
Expand All @@ -37,13 +37,12 @@ public ColumnLowCardinality(String name, DataTypeLowCardinality type, Object[] v
nested_is_nullable = type.getNestedTypes().nullable();
/// If a nested type is nullable, always add two hard dictionary keys in front: [0]: null, [1]: default value
if (nested_is_nullable) {
IDataType nested_type = ((DataTypeNullable) type.getNestedTypes()).getNestedDataType();
data = ColumnFactory.createColumn(null, nested_type, null);
nested_type = ((DataTypeNullable) type.getNestedTypes()).getNestedDataType();
dict.add(type.getNestedTypes().defaultValue());
dict.add(type.getNestedTypes().defaultValue());
}
else {
data = ColumnFactory.createColumn(null, type.getNestedTypes(), null);
nested_type = type.getNestedTypes();
}
}

Expand All @@ -70,37 +69,32 @@ public void write(Object object) throws IOException, SQLException {
}

@Override
public void flushToSerializer(BinarySerializer serializer, boolean immediate) throws IOException, SQLException {
if (isExported()) {
serializer.writeUTF8StringBinary(name);
serializer.writeUTF8StringBinary(type.name());
}

if (immediate) {
/// The data layout: [version][index_type][dictionary][indexes]
serializer.writeLong(version);
serializer.writeLong(IndexType.UInt64.getValue() | IndexType.HasAdditionalKeysBit.getValue());

serializer.writeLong(dict.size());
for (int i = 0; i < dict.size(); i++) {
data.write(dict.get(i));
}
data.flushToSerializer(serializer, true);
public void setColumnWriterBuffer(ColumnWriterBuffer buffer) {
super.setColumnWriterBuffer(buffer);
}

serializer.writeLong(indexes.size()); // give index type size
for (int i = 0; i < indexes.size(); i++) {
serializer.writeLong(indexes.get(i));
}
}
@Override
public void clear() {
}

@Override
public void setColumnWriterBuffer(ColumnWriterBuffer buffer) {
super.setColumnWriterBuffer(buffer);
data.setColumnWriterBuffer(buffer);
public void SerializeBulkPrefix(BinarySerializer serializer) throws IOException, SQLException {
serializer.writeLong(version);
}

@Override
public void clear() {
public void SerializeBulk(BinarySerializer serializer, Boolean now) throws IOException, SQLException {
/// The data layout: [index_type][dictionary][indexes]
serializer.writeLong(IndexType.UInt64.getValue() | IndexType.HasAdditionalKeysBit.getValue());
serializer.writeLong(dict.size());

nested_type.serializeBinaryBulk(dict.toArray(), serializer);

serializer.writeLong(indexes.size()); // give index type size
for (int i = 0; i < indexes.size(); i++) {
serializer.writeLong(indexes.get(i));
}

}

}
48 changes: 26 additions & 22 deletions timeplus-native-jdbc/src/main/java/com/timeplus/data/ColumnMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,24 +56,6 @@ public void write(Object object) throws IOException, SQLException {

}

@Override
public void flushToSerializer(BinarySerializer serializer, boolean now) throws IOException, SQLException {
if (isExported()) {
serializer.writeUTF8StringBinary(name);
serializer.writeUTF8StringBinary(type.name());
}

flushOffsets(serializer);

for (IColumn data : columnDataArray) {
data.flushToSerializer(serializer, true);
}

if (now) {
buffer.writeTo(serializer);
}
}

public void flushOffsets(BinarySerializer serializer) throws IOException {
for (long offsetList : offsets) {
serializer.writeLong(offsetList);
Expand All @@ -84,16 +66,38 @@ public void flushOffsets(BinarySerializer serializer) throws IOException {
public void setColumnWriterBuffer(ColumnWriterBuffer buffer) {
super.setColumnWriterBuffer(buffer);

for (IColumn data : columnDataArray) {
data.setColumnWriterBuffer(new ColumnWriterBuffer());
for (IColumn nestedColumn : columnDataArray) {
nestedColumn.setColumnWriterBuffer(new ColumnWriterBuffer());
}
}

@Override
public void clear() {
offsets.clear();
for (IColumn data : columnDataArray) {
data.clear();
for (IColumn nestedColumn : columnDataArray) {
nestedColumn.clear();
}
}

@Override
public void SerializeBulkPrefix(BinarySerializer serializer) throws SQLException, IOException {
for (IColumn nestedColumn : columnDataArray) {
nestedColumn.SerializeBulkPrefix(serializer);
}
}

@Override
public void SerializeBulk(BinarySerializer serializer, Boolean now) throws IOException, SQLException {

flushOffsets(serializer);

for (IColumn nestedColumn : columnDataArray) {
nestedColumn.SerializeBulk(serializer, true);
}

if (now) {
buffer.writeTo(serializer);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,43 +27,40 @@ public class ColumnNullable extends AbstractColumn {

private final List<Byte> nullableSign;
// data represents nested column in ColumnArray
private final IColumn data;
private final IColumn nestedColumn;

public ColumnNullable(String name, DataTypeNullable type, Object[] values) {
super(name, type, values);
nullableSign = new ArrayList<>();
data = ColumnFactory.createColumn(null, type.getNestedDataType(), null);
nestedColumn = ColumnFactory.createColumn(null, type.getNestedDataType(), null);
}

@Override
public void write(@Nullable Object object) throws IOException, SQLException {
if (object == null) {
nullableSign.add((byte) 1);
data.write(type.defaultValue()); // write whatever for padding
nestedColumn.write(type.defaultValue()); // write whatever for padding
} else {
nullableSign.add((byte) 0);
data.write(object);
nestedColumn.write(object);
}
}

@Override
public void flushToSerializer(BinarySerializer serializer, boolean immediate) throws IOException {
if (isExported()) {
serializer.writeUTF8StringBinary(name);
serializer.writeUTF8StringBinary(type.name());
}
public void setColumnWriterBuffer(ColumnWriterBuffer buffer) {
super.setColumnWriterBuffer(buffer);
nestedColumn.setColumnWriterBuffer(buffer);
}


@Override
public void SerializeBulk(BinarySerializer serializer, Boolean now) throws IOException, SQLException {
for (byte sign : nullableSign) {
serializer.writeByte(sign);
}

if (immediate)
if (now)
buffer.writeTo(serializer);
}

@Override
public void setColumnWriterBuffer(ColumnWriterBuffer buffer) {
super.setColumnWriterBuffer(buffer);
data.setColumnWriterBuffer(buffer);
}
}
Loading

0 comments on commit c57d951

Please sign in to comment.