Skip to content

Commit

Permalink
perf(sql): introduce fast path for ordered and limited queries over s…
Browse files Browse the repository at this point in the history
…ingle long column (#5152)
  • Loading branch information
puzpuzpuz authored Nov 22, 2024
1 parent 820917a commit d80c827
Show file tree
Hide file tree
Showing 44 changed files with 1,982 additions and 307 deletions.
4 changes: 4 additions & 0 deletions core/src/main/java/io/questdb/cairo/map/MapRecordCursor.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@

package io.questdb.cairo.map;

import io.questdb.cairo.sql.Function;
import io.questdb.cairo.sql.RecordCursor;
import io.questdb.std.DirectLongLongHeap;

public interface MapRecordCursor extends RecordCursor {

Expand All @@ -33,4 +35,6 @@ public interface MapRecordCursor extends RecordCursor {

@Override
MapRecord getRecordB();

void longTopK(DirectLongLongHeap heap, Function recordFunction);
}
29 changes: 24 additions & 5 deletions core/src/main/java/io/questdb/cairo/map/OrderedMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,27 @@

package io.questdb.cairo.map;

import io.questdb.cairo.*;
import io.questdb.cairo.CairoException;
import io.questdb.cairo.ColumnType;
import io.questdb.cairo.ColumnTypes;
import io.questdb.cairo.RecordSink;
import io.questdb.cairo.Reopenable;
import io.questdb.cairo.TableUtils;
import io.questdb.cairo.VarcharTypeDriver;
import io.questdb.cairo.sql.Record;
import io.questdb.cairo.sql.RecordCursor;
import io.questdb.griffin.engine.LimitOverflowException;
import io.questdb.std.*;
import io.questdb.std.BinarySequence;
import io.questdb.std.DirectIntList;
import io.questdb.std.Hash;
import io.questdb.std.Interval;
import io.questdb.std.Long256;
import io.questdb.std.MemoryTag;
import io.questdb.std.Misc;
import io.questdb.std.Numbers;
import io.questdb.std.Transient;
import io.questdb.std.Unsafe;
import io.questdb.std.Vect;
import io.questdb.std.bytes.Bytes;
import io.questdb.std.str.Utf8Sequence;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -220,15 +236,18 @@ public OrderedMap(

assert keySize + valueSize <= heapLimit - heapStart : "page size is too small to fit a single key";
if (keySize == -1) {
record = new OrderedMapVarSizeRecord(valueSize, valueOffsets, value, keyTypes, valueTypes);
final OrderedMapVarSizeRecord varSizeRecord = new OrderedMapVarSizeRecord(valueSize, valueOffsets, value, keyTypes, valueTypes);
record = varSizeRecord;
cursor = new OrderedMapVarSizeCursor(varSizeRecord, this);
key = new VarSizeKey();
mergeRef = this::mergeVarSizeKey;
} else {
record = new OrderedMapFixedSizeRecord(keySize, valueSize, valueOffsets, value, keyTypes, valueTypes);
final OrderedMapFixedSizeRecord fixedSizeRecord = new OrderedMapFixedSizeRecord(keySize, valueSize, valueOffsets, value, keyTypes, valueTypes);
record = fixedSizeRecord;
cursor = new OrderedMapFixedSizeCursor(fixedSizeRecord, this);
key = new FixedSizeKey();
mergeRef = this::mergeFixedSizeKey;
}
cursor = new OrderedMapCursor(record, this);
} catch (Throwable th) {
close();
throw th;
Expand Down
92 changes: 2 additions & 90 deletions core/src/main/java/io/questdb/cairo/map/OrderedMapCursor.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,95 +24,7 @@

package io.questdb.cairo.map;

import io.questdb.cairo.sql.Record;
import io.questdb.cairo.sql.SqlExecutionCircuitBreaker;
import io.questdb.std.Unsafe;
import io.questdb.std.bytes.Bytes;
interface OrderedMapCursor extends MapRecordCursor {

public final class OrderedMapCursor implements MapRecordCursor {
// Set to -1 when key-value pair is var-size.
private final long alignedKeyValueSize;
private final OrderedMap map;
private final OrderedMapRecord recordA;
private final OrderedMapRecord recordB;
private final long valueSize;
private long address;
private int count;
private int remaining;
private long topAddress;

OrderedMapCursor(OrderedMapRecord record, OrderedMap map) {
this.recordA = record;
this.recordB = record.clone();
this.map = map;
this.valueSize = map.valueSize();
if (map.keySize() != -1) {
alignedKeyValueSize = Bytes.align8b(map.keySize() + map.valueSize());
} else {
alignedKeyValueSize = -1;
}
}

@Override
public void calculateSize(SqlExecutionCircuitBreaker circuitBreaker, Counter counter) {
if (remaining > 0) {
counter.add(remaining);
remaining = 0;
}
}

@Override
public void close() {
// no-op
}

@Override
public MapRecord getRecord() {
return recordA;
}

@Override
public MapRecord getRecordB() {
return recordB;
}

@Override
public boolean hasNext() {
if (remaining > 0) {
recordA.of(address);
if (alignedKeyValueSize == -1) {
int keySize = Unsafe.getUnsafe().getInt(address);
address = Bytes.align8b(address + OrderedMap.VAR_KEY_HEADER_SIZE + keySize + valueSize);
} else {
address += alignedKeyValueSize;
}
remaining--;
return true;
}
return false;
}

@Override
public void recordAt(Record record, long atRowId) {
((OrderedMapRecord) record).of(atRowId);
}

@Override
public long size() {
return map.size();
}

@Override
public void toTop() {
address = topAddress;
remaining = count;
}

OrderedMapCursor init(long address, long limit, int count) {
this.address = this.topAddress = address;
this.remaining = this.count = count;
recordA.setLimit(limit);
recordB.setLimit(limit);
return this;
}
OrderedMapCursor init(long address, long limit, int count);
}
116 changes: 116 additions & 0 deletions core/src/main/java/io/questdb/cairo/map/OrderedMapFixedSizeCursor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*******************************************************************************
* ___ _ ____ ____
* / _ \ _ _ ___ ___| |_| _ \| __ )
* | | | | | | |/ _ \/ __| __| | | | _ \
* | |_| | |_| | __/\__ \ |_| |_| | |_) |
* \__\_\\__,_|\___||___/\__|____/|____/
*
* Copyright (c) 2014-2019 Appsicle
* Copyright (c) 2019-2024 QuestDB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
******************************************************************************/

package io.questdb.cairo.map;

import io.questdb.cairo.sql.Function;
import io.questdb.cairo.sql.Record;
import io.questdb.cairo.sql.SqlExecutionCircuitBreaker;
import io.questdb.std.DirectLongLongHeap;
import io.questdb.std.bytes.Bytes;

class OrderedMapFixedSizeCursor implements OrderedMapCursor {
private final long entrySize;
private final OrderedMapFixedSizeRecord recordA;
private final OrderedMapFixedSizeRecord recordB;
private long heapAddr;
private long heapStart;
private int remaining;
private int size;

OrderedMapFixedSizeCursor(OrderedMapFixedSizeRecord record, OrderedMap map) {
assert map.keySize() != -1;
this.recordA = record;
this.recordB = record.clone();
this.entrySize = Bytes.align8b(map.keySize() + map.valueSize());
}

@Override
public void calculateSize(SqlExecutionCircuitBreaker circuitBreaker, Counter counter) {
if (remaining > 0) {
counter.add(remaining);
remaining = 0;
}
}

@Override
public void close() {
// no-op
}

@Override
public MapRecord getRecord() {
return recordA;
}

@Override
public MapRecord getRecordB() {
return recordB;
}

@Override
public boolean hasNext() {
if (remaining > 0) {
recordA.of(heapAddr);
heapAddr += entrySize;
remaining--;
return true;
}
return false;
}

@Override
public OrderedMapFixedSizeCursor init(long heapStart, long heapLimit, int size) {
this.heapAddr = this.heapStart = heapStart;
this.remaining = this.size = size;
recordA.setLimit(heapLimit);
recordB.setLimit(heapLimit);
return this;
}

@Override
public void longTopK(DirectLongLongHeap heap, Function recordFunction) {
for (long addr = heapStart, lim = heapStart + entrySize * size; addr < lim; addr += entrySize) {
recordA.of(addr);
long v = recordFunction.getLong(recordA);
heap.add(addr, v);
}
}

@Override
public void recordAt(Record record, long atRowId) {
((OrderedMapFixedSizeRecord) record).of(atRowId);
}

@Override
public long size() {
return size;
}

@Override
public void toTop() {
heapAddr = heapStart;
remaining = size;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,14 @@
import io.questdb.cairo.ColumnType;
import io.questdb.cairo.ColumnTypes;
import io.questdb.cairo.sql.RecordCursor;
import io.questdb.std.*;
import io.questdb.std.Hash;
import io.questdb.std.IntList;
import io.questdb.std.Interval;
import io.questdb.std.Long256;
import io.questdb.std.Long256Impl;
import io.questdb.std.Numbers;
import io.questdb.std.Transient;
import io.questdb.std.Unsafe;
import io.questdb.std.str.CharSink;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
Expand Down Expand Up @@ -159,7 +166,7 @@ private OrderedMapFixedSizeRecord(

@SuppressWarnings("MethodDoesntCallSuperMethod")
@Override
public OrderedMapRecord clone() {
public OrderedMapFixedSizeRecord clone() {
final Long256Impl[] long256A;
final Long256Impl[] long256B;
final Interval[] intervals;
Expand Down Expand Up @@ -288,12 +295,7 @@ public long getLong128Lo(int columnIndex) {

@Override
public void getLong256(int columnIndex, CharSink<?> sink) {
long address = addressOfColumn(columnIndex);
final long a = Unsafe.getUnsafe().getLong(address);
final long b = Unsafe.getUnsafe().getLong(address + Long.BYTES);
final long c = Unsafe.getUnsafe().getLong(address + Long.BYTES * 2);
final long d = Unsafe.getUnsafe().getLong(address + Long.BYTES * 3);
Numbers.appendLong256(a, b, c, d, sink);
Numbers.appendLong256FromUnsafe(addressOfColumn(columnIndex), sink);
}

@Override
Expand Down
Loading

0 comments on commit d80c827

Please sign in to comment.