Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement reverseScan #711

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 26 additions & 6 deletions src/main/java/org/tikv/common/KVClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,25 @@ public List<KvPair> batchGet(BackOffer backOffer, List<ByteString> keys, long ve
*/
public List<KvPair> scan(ByteString startKey, ByteString endKey, long version)
throws GrpcException {
Iterator<KvPair> iterator = scanIterator(conf, clientBuilder, startKey, endKey, version);
return scan(startKey, endKey, version, false);
}

/**
* Scan key-value pairs from TiKV in range [startKey, endKey) or if reversely, [endKey, startKey)
*
* @param startKey start key, inclusive
* @param endKey end key, exclusive
* @param reverse whether to scan reversely
* @return list of key-value pairs in range
*/
public List<KvPair> scan(ByteString startKey, ByteString endKey, long version, boolean reverse)
throws GrpcException {
Iterator<KvPair> iterator;
if (reverse) {
iterator = scanIterator(conf, clientBuilder, endKey, startKey, version, reverse);
} else {
iterator = scanIterator(conf, clientBuilder, startKey, endKey, version, reverse);
}
List<KvPair> result = new ArrayList<>();
iterator.forEachRemaining(result::add);
return result;
Expand All @@ -115,7 +133,7 @@ public List<KvPair> scan(ByteString startKey, ByteString endKey, long version)
* @return list of key-value pairs in range
*/
public List<KvPair> scan(ByteString startKey, long version, int limit) throws GrpcException {
Iterator<KvPair> iterator = scanIterator(conf, clientBuilder, startKey, version, limit);
Iterator<KvPair> iterator = scanIterator(conf, clientBuilder, startKey, version, limit, false);
List<KvPair> result = new ArrayList<>();
iterator.forEachRemaining(result::add);
return result;
Expand Down Expand Up @@ -183,16 +201,18 @@ private Iterator<KvPair> scanIterator(
RegionStoreClientBuilder builder,
ByteString startKey,
ByteString endKey,
long version) {
return new ConcreteScanIterator(conf, builder, startKey, endKey, version);
long version,
boolean reverse) {
return new ConcreteScanIterator(conf, builder, startKey, endKey, version, reverse);
}

private Iterator<KvPair> scanIterator(
TiConfiguration conf,
RegionStoreClientBuilder builder,
ByteString startKey,
long version,
int limit) {
return new ConcreteScanIterator(conf, builder, startKey, version, limit);
int limit,
boolean reverse) {
return new ConcreteScanIterator(conf, builder, startKey, version, limit, reverse);
}
}
38 changes: 36 additions & 2 deletions src/main/java/org/tikv/common/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,24 @@ public Iterator<KvPair> scan(ByteString startKey) {
session.getRegionStoreClientBuilder(),
startKey,
timestamp.getVersion(),
Integer.MAX_VALUE);
Integer.MAX_VALUE,
false);
}

/**
* scan all keys becofe startKey, inclusive
*
* @param startKey start of keys
* @return iterator of kvPair
*/
public Iterator<KvPair> reverseScan(ByteString startKey) {
return new ConcreteScanIterator(
session.getConf(),
session.getRegionStoreClientBuilder(),
startKey,
timestamp.getVersion(),
Integer.MAX_VALUE,
true);
}

/**
Expand All @@ -173,7 +190,24 @@ public Iterator<KvPair> scanPrefix(ByteString prefix) {
session.getRegionStoreClientBuilder(),
prefix,
nextPrefix,
timestamp.getVersion());
timestamp.getVersion(),
false);
}
/**
* scan all keys with prefix, reversely
*
* @param prefix prefix of keys
* @return iterator of kvPair
*/
public Iterator<KvPair> reverseScanPrefix(ByteString prefix) {
ByteString nextPrefix = Key.toRawKey(prefix).nextPrefix().toByteString();
return new ConcreteScanIterator(
session.getConf(),
session.getRegionStoreClientBuilder(),
nextPrefix,
prefix,
timestamp.getVersion(),
true);
}

public TiConfiguration getConf() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,21 @@ public ConcreteScanIterator(
RegionStoreClientBuilder builder,
ByteString startKey,
long version,
int limit) {
int limit,
boolean reverse) {
// Passing endKey as ByteString.EMPTY means that endKey is +INF by default,
this(conf, builder, startKey, ByteString.EMPTY, version, limit);
this(conf, builder, startKey, ByteString.EMPTY, version, limit, reverse);
}

public ConcreteScanIterator(
TiConfiguration conf,
RegionStoreClientBuilder builder,
ByteString startKey,
ByteString endKey,
long version) {
long version,
boolean reverse) {
// Passing endKey as ByteString.EMPTY means that endKey is +INF by default,
this(conf, builder, startKey, endKey, version, Integer.MAX_VALUE);
this(conf, builder, startKey, endKey, version, Integer.MAX_VALUE, reverse);
}

private ConcreteScanIterator(
Expand All @@ -65,8 +67,9 @@ private ConcreteScanIterator(
ByteString startKey,
ByteString endKey,
long version,
int limit) {
super(conf, builder, startKey, endKey, limit, false);
int limit,
boolean reverse) {
super(conf, builder, startKey, endKey, limit, false, reverse);
this.version = version;
}

Expand All @@ -76,7 +79,7 @@ TiRegion loadCurrentRegionToCache() throws GrpcException {
try (RegionStoreClient client = builder.build(startKey)) {
client.setTimeout(conf.getScanTimeout());
BackOffer backOffer = ConcreteBackOffer.newScannerNextMaxBackOff();
currentCache = client.scan(backOffer, startKey, version);
currentCache = client.scan(backOffer, startKey, version, reverse);
// If we get region before scan, we will use region from cache which
// may have wrong end key. This may miss some regions that split from old region.
// Client will get the newest region during scan. So we need to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ public RawScanIterator(
ByteString endKey,
int limit,
boolean keyOnly,
boolean reverse,
BackOffer scanBackOffer) {
super(conf, builder, startKey, endKey, limit, keyOnly);
super(conf, builder, startKey, endKey, limit, keyOnly, reverse);

this.scanBackOffer = scanBackOffer;
}
Expand All @@ -56,7 +57,7 @@ TiRegion loadCurrentRegionToCache() throws GrpcException {
currentCache = null;
} else {
try {
currentCache = client.rawScan(backOffer, startKey, limit, keyOnly);
currentCache = client.rawScan(backOffer, startKey, limit, keyOnly, reverse);
// Client will get the newest region during scan. So we need to
// update region after scan.
region = client.getRegion();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,21 +43,24 @@ public abstract class ScanIterator implements Iterator<Kvrpcpb.KvPair> {
protected Key endKey;
protected boolean hasEndKey;
protected boolean processingLastBatch = false;
protected boolean reverse = false;

ScanIterator(
TiConfiguration conf,
RegionStoreClientBuilder builder,
ByteString startKey,
ByteString endKey,
int limit,
boolean keyOnly) {
boolean keyOnly,
boolean reverse) {
this.startKey = requireNonNull(startKey, "start key is null");
this.endKey = Key.toRawKey(requireNonNull(endKey, "end key is null"));
this.hasEndKey = !endKey.isEmpty();
this.limit = limit;
this.keyOnly = keyOnly;
this.conf = conf;
this.builder = builder;
this.reverse = reverse;
}

/**
Expand Down
23 changes: 15 additions & 8 deletions src/main/java/org/tikv/common/region/RegionStoreClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,7 @@
import org.tikv.common.util.HistogramUtils;
import org.tikv.common.util.Pair;
import org.tikv.common.util.RangeSplitter;
import org.tikv.kvproto.Coprocessor;
import org.tikv.kvproto.Errorpb;
import org.tikv.kvproto.*;
import org.tikv.kvproto.Kvrpcpb.BatchGetRequest;
import org.tikv.kvproto.Kvrpcpb.BatchGetResponse;
import org.tikv.kvproto.Kvrpcpb.CommitRequest;
Expand Down Expand Up @@ -109,8 +108,6 @@
import org.tikv.kvproto.Kvrpcpb.SplitRegionResponse;
import org.tikv.kvproto.Kvrpcpb.TxnHeartBeatRequest;
import org.tikv.kvproto.Kvrpcpb.TxnHeartBeatResponse;
import org.tikv.kvproto.Metapb;
import org.tikv.kvproto.TikvGrpc;
import org.tikv.kvproto.TikvGrpc.TikvBlockingStub;
import org.tikv.kvproto.TikvGrpc.TikvFutureStub;
import org.tikv.txn.AbstractLockResolverClient;
Expand Down Expand Up @@ -336,7 +333,7 @@ private List<KvPair> handleBatchGetResponse(
}

public List<KvPair> scan(
BackOffer backOffer, ByteString startKey, long version, boolean keyOnly) {
BackOffer backOffer, ByteString startKey, long version, boolean keyOnly, boolean reverse) {
boolean forWrite = false;
while (true) {
Supplier<ScanRequest> request =
Expand All @@ -348,6 +345,7 @@ public List<KvPair> scan(
.setStartKey(codec.encodeKey(startKey))
.setVersion(version)
.setKeyOnly(keyOnly)
.setReverse(reverse)
.setLimit(getConf().getScanBatchSize())
.build();

Expand Down Expand Up @@ -417,6 +415,11 @@ public List<KvPair> scan(BackOffer backOffer, ByteString startKey, long version)
return scan(backOffer, startKey, version, false);
}

public List<KvPair> scan(
BackOffer backOffer, ByteString startKey, long version, boolean reverse) {
return scan(backOffer, startKey, version, false, reverse);
}

/**
* Prewrite batch keys
*
Expand Down Expand Up @@ -1238,9 +1241,11 @@ private void handleRawBatchDelete(RawBatchDeleteResponse resp) {
* @param backOffer BackOffer
* @param key startKey
* @param keyOnly true if value of KvPair is not needed
* @param reverse
* @return KvPair list
*/
public List<KvPair> rawScan(BackOffer backOffer, ByteString key, int limit, boolean keyOnly) {
public List<KvPair> rawScan(
BackOffer backOffer, ByteString key, int limit, boolean keyOnly, boolean reverse) {
Long clusterId = pdClient.getClusterId();
Histogram.Timer requestTimer =
GRPC_RAW_REQUEST_LATENCY.labels("client_grpc_raw_scan", clusterId.toString()).startTimer();
Expand All @@ -1254,6 +1259,7 @@ public List<KvPair> rawScan(BackOffer backOffer, ByteString key, int limit, bool
.setEndKey(range.second)
.setKeyOnly(keyOnly)
.setLimit(limit)
.setReverse(reverse)
.build();
};

Expand All @@ -1271,8 +1277,9 @@ public List<KvPair> rawScan(BackOffer backOffer, ByteString key, int limit, bool
}
}

public List<KvPair> rawScan(BackOffer backOffer, ByteString key, boolean keyOnly) {
return rawScan(backOffer, key, getConf().getScanBatchSize(), keyOnly);
public List<KvPair> rawScan(
BackOffer backOffer, ByteString key, boolean keyOnly, boolean reverse) {
return rawScan(backOffer, key, getConf().getScanBatchSize(), keyOnly, reverse);
}

private List<KvPair> rawScanHelper(RawScanResponse resp) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/tikv/raw/RawKVClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -1011,7 +1011,7 @@ private Iterator<KvPair> rawScanIterator(
if (limit > MAX_RAW_SCAN_LIMIT) {
throw ERR_MAX_SCAN_LIMIT_EXCEEDED;
}
return new RawScanIterator(conf, builder, startKey, endKey, limit, keyOnly, backOffer);
return new RawScanIterator(conf, builder, startKey, endKey, limit, keyOnly, false, backOffer);
}

/**
Expand Down
64 changes: 54 additions & 10 deletions src/main/java/org/tikv/txn/KVClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,37 @@ public List<Kvrpcpb.KvPair> batchGet(BackOffer backOffer, List<ByteString> keys,
*/
public List<Kvrpcpb.KvPair> scan(ByteString startKey, ByteString endKey, long version)
throws GrpcException {
return scan(startKey, endKey, version, false);
}
/**
* Scan key-value pairs from TiKV reversely in range (startKey, endKey]
*
* @param startKey start key, inclusive
* @param endKey end key, exclusive
* @return list of key-value pairs in range
*/
public List<Kvrpcpb.KvPair> reverseScan(ByteString startKey, ByteString endKey, long version)
throws GrpcException {
return scan(endKey, startKey, version, true);
}

public List<Kvrpcpb.KvPair> scan(
ByteString startKey, ByteString endKey, long version, boolean reverse) throws GrpcException {
Iterator<Kvrpcpb.KvPair> iterator;
if (reverse) {
iterator = scanIterator(conf, clientBuilder, endKey, startKey, version, reverse);
} else {
iterator = scanIterator(conf, clientBuilder, startKey, endKey, version, reverse);
}
List<Kvrpcpb.KvPair> result = new ArrayList<>();
iterator.forEachRemaining(result::add);
return result;
}

public List<Kvrpcpb.KvPair> scan(ByteString startKey, long version, int limit, boolean reverse)
throws GrpcException {
Iterator<Kvrpcpb.KvPair> iterator =
scanIterator(conf, clientBuilder, startKey, endKey, version);
scanIterator(conf, clientBuilder, startKey, version, limit, reverse);
List<Kvrpcpb.KvPair> result = new ArrayList<>();
iterator.forEachRemaining(result::add);
return result;
Expand All @@ -130,14 +159,27 @@ public List<Kvrpcpb.KvPair> scan(ByteString startKey, ByteString endKey, long ve
*/
public List<Kvrpcpb.KvPair> scan(ByteString startKey, long version, int limit)
throws GrpcException {
Iterator<Kvrpcpb.KvPair> iterator = scanIterator(conf, clientBuilder, startKey, version, limit);
List<Kvrpcpb.KvPair> result = new ArrayList<>();
iterator.forEachRemaining(result::add);
return result;
return scan(startKey, version, limit, false);
}

/**
* Scan key-value pairs reversively from TiKV in range ('', endKey], maximum to `limit` pairs
*
* @param endKey start key, inclusive
* @param limit limit of kv pairs
* @return list of key-value pairs in range
*/
public List<Kvrpcpb.KvPair> reverseScan(ByteString endKey, long version, int limit)
throws GrpcException {
return scan(endKey, version, limit, true);
}

public List<Kvrpcpb.KvPair> scan(ByteString startKey, long version) throws GrpcException {
return scan(startKey, version, Integer.MAX_VALUE);
return scan(startKey, version, Integer.MAX_VALUE, false);
}

public List<Kvrpcpb.KvPair> reverseScan(ByteString endKey, long version) throws GrpcException {
return scan(endKey, version, Integer.MAX_VALUE, true);
}

public synchronized void ingest(List<Pair<ByteString, ByteString>> list) throws GrpcException {
Expand Down Expand Up @@ -264,17 +306,19 @@ private Iterator<Kvrpcpb.KvPair> scanIterator(
RegionStoreClientBuilder builder,
ByteString startKey,
ByteString endKey,
long version) {
return new ConcreteScanIterator(conf, builder, startKey, endKey, version);
long version,
boolean reverse) {
return new ConcreteScanIterator(conf, builder, startKey, endKey, version, reverse);
}

private Iterator<Kvrpcpb.KvPair> scanIterator(
TiConfiguration conf,
RegionStoreClientBuilder builder,
ByteString startKey,
long version,
int limit) {
return new ConcreteScanIterator(conf, builder, startKey, version, limit);
int limit,
boolean reverse) {
return new ConcreteScanIterator(conf, builder, startKey, version, limit, reverse);
}

private void doIngest(TiRegion region, List<Pair<ByteString, ByteString>> sortedList)
Expand Down