Skip to content

Commit

Permalink
move br component from tikv java client to this repo
Browse files Browse the repository at this point in the history
Signed-off-by: marsishandsome <[email protected]>
  • Loading branch information
marsishandsome committed Mar 23, 2022
1 parent 2b13212 commit 20fbe69
Show file tree
Hide file tree
Showing 13 changed files with 371 additions and 8 deletions.
2 changes: 1 addition & 1 deletion online-bulk-load/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ mvn clean package -DskipTests -am -pl online-bulk-load
```
spark-submit \
--master local[*] \
--jars /path/to/tikv-client-java-3.2.0-SNAPSHOT.jar \
--jars /path/to/tikv-client-java-3.3.0-SNAPSHOT.jar \
--class org.tikv.bulkload.example.BulkLoadExample \
online-bulk-load/target/online-bulk-load-0.0.1-SNAPSHOT.jar \
<pdaddr> <key_prefix> <data_size> <partition_nums>
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@
<dependency>
<groupId>org.tikv</groupId>
<artifactId>tikv-client-java</artifactId>
<version>3.2.0-SNAPSHOT</version>
<version>3.3.0-SNAPSHOT</version>
</dependency>

<dependency>
Expand Down
2 changes: 1 addition & 1 deletion sst-data-source/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ br backup raw \
```
spark-submit \
--master local[*] \
--jars /path/to/tikv-client-java-3.2.0-SNAPSHOT.jar \
--jars /path/to/tikv-client-java-3.3.0-SNAPSHOT.jar \
--class org.tikv.datasources.sst.example.SSTDataSourceExample \
sst-data-source/target/sst-data-source-0.0.1-SNAPSHOT.jar \
hdfs:///path/to/sst/
Expand Down
12 changes: 12 additions & 0 deletions sst-data-source/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,18 @@
<packaging>jar</packaging>
<name>Spark SST Data Source</name>

<properties>
<rocksdb.version>6.22.1.1</rocksdb.version>
</properties>

<dependencies>
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
<version>${rocksdb.version}</version>
</dependency>
</dependencies>

<build>
<extensions>
<extension>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2021 TiKV Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.tikv.datasources.br;

import java.io.Serializable;
import org.rocksdb.Options;
import org.rocksdb.ReadOptions;
import org.tikv.common.exception.SSTDecodeException;
import org.tikv.kvproto.Brpb;

public class BackupDecoder implements Serializable {
private final Brpb.BackupMeta backupMeta;
private final boolean ttlEnabled;
private final KVDecoder kvDecoder;

public BackupDecoder(Brpb.BackupMeta backupMeta) throws SSTDecodeException {
this.backupMeta = backupMeta;
this.ttlEnabled = false;
this.kvDecoder = initKVDecoder();
}

public BackupDecoder(Brpb.BackupMeta backupMeta, boolean ttlEnabled) throws SSTDecodeException {
this.backupMeta = backupMeta;
this.ttlEnabled = ttlEnabled;
this.kvDecoder = initKVDecoder();
}

private KVDecoder initKVDecoder() throws SSTDecodeException {
if (backupMeta.getIsRawKv()) {
if ("V1".equals(backupMeta.getApiVersion().name())) {
return new RawKVDecoderV1(ttlEnabled);
} else {
throw new SSTDecodeException(
"does not support decode APIVersion " + backupMeta.getApiVersion().name());
}
} else {
throw new SSTDecodeException("TxnKV is not supported yet!");
}
}

public org.tikv.datasources.br.SSTDecoder decodeSST(String sstFilePath) {
return decodeSST(sstFilePath, new Options(), new ReadOptions());
}

public org.tikv.datasources.br.SSTDecoder decodeSST(String sstFilePath, Options options, ReadOptions readOptions) {
return new SSTDecoder(sstFilePath, kvDecoder, options, readOptions);
}

public Brpb.BackupMeta getBackupMeta() {
return backupMeta;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2021 TiKV Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.tikv.datasources.br;

import com.google.protobuf.InvalidProtocolBufferException;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import org.tikv.kvproto.Brpb;

public class BackupMetaDecoder {
private final Brpb.BackupMeta backupMeta;

public BackupMetaDecoder(byte[] data) throws org.tikv.shade.com.google.protobuf.InvalidProtocolBufferException {
this.backupMeta = Brpb.BackupMeta.parseFrom(data);
}

public Brpb.BackupMeta getBackupMeta() {
return backupMeta;
}

public static BackupMetaDecoder parse(String backupMetaFilePath) throws IOException {
byte[] data = Files.readAllBytes(new File(backupMetaFilePath).toPath());
return new BackupMetaDecoder(data);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2021 TiKV Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.tikv.datasources.br;

import com.google.protobuf.ByteString;
import java.io.Serializable;

public interface KVDecoder extends Serializable {
ByteString decodeKey(byte[] key);

ByteString decodeValue(byte[] value);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2021 TiKV Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.tikv.datasources.br;

import com.google.protobuf.ByteString;
import java.util.Arrays;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RawKVDecoderV1 implements KVDecoder {
private static final Logger logger = LoggerFactory.getLogger(SSTIterator.class);

private final boolean ttlEnabled;

public RawKVDecoderV1(boolean ttlEnabled) {
this.ttlEnabled = ttlEnabled;
}

@Override
public ByteString decodeKey(byte[] key) {
if (key == null || key.length == 0) {
logger.warn(
"skip Key-Value pair because key == null || key.length == 0, key = "
+ Arrays.toString(key));
return null;
} else if (key[0] != 'z') {
logger.warn("skip Key-Value pair because key[0] != 'z', key = " + Arrays.toString(key));
return null;
}
return ByteString.copyFrom(key, 1, key.length - 1);
}

@Override
public ByteString decodeValue(byte[] value) {
if (!ttlEnabled) {
return ByteString.copyFrom(value);
} else {
return ByteString.copyFrom(value).substring(0, value.length - 8);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright 2021 TiKV Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.tikv.datasources.br;

import com.google.protobuf.ByteString;
import java.util.Iterator;
import org.rocksdb.Options;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDBException;
import org.rocksdb.SstFileReader;
import org.rocksdb.SstFileReaderIterator;
import org.tikv.common.util.Pair;

public class SSTDecoder {
private final String filePath;
private final KVDecoder kvDecoder;
private final Options options;
private final ReadOptions readOptions;

private SstFileReader sstFileReader;
private SstFileReaderIterator iterator;

public SSTDecoder(String sstFilePath, KVDecoder kvDecoder) {
this.filePath = sstFilePath;
this.kvDecoder = kvDecoder;
this.options = new Options();
this.readOptions = new ReadOptions();
}

public SSTDecoder(
String filePath, KVDecoder kvDecoder, Options options, ReadOptions readOptions) {
this.filePath = filePath;
this.kvDecoder = kvDecoder;
this.options = options;
this.readOptions = readOptions;
}

public synchronized Iterator<Pair<ByteString, ByteString>> getIterator() throws RocksDBException {
if (sstFileReader != null || iterator != null) {
throw new RocksDBException("File already opened!");
}

sstFileReader = new SstFileReader(new Options());
sstFileReader.open(filePath);
iterator = sstFileReader.newIterator(new ReadOptions());
return new SSTIterator(iterator, kvDecoder);
}

public synchronized void close() {
try {
if (iterator != null) {
iterator.close();
}
} finally {
iterator = null;
}

try {
if (sstFileReader != null) {
sstFileReader.close();
}
} finally {
sstFileReader = null;
}
}

public String getFilePath() {
return filePath;
}

public Options getOptions() {
return options;
}

public ReadOptions getReadOptions() {
return readOptions;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2021 TiKV Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.tikv.datasources.br;

import com.google.protobuf.ByteString;
import java.util.Iterator;
import org.rocksdb.SstFileReaderIterator;
import org.tikv.common.util.Pair;

public class SSTIterator implements Iterator<Pair<ByteString, ByteString>> {
private final SstFileReaderIterator iterator;
private final KVDecoder kvDecoder;

private Pair<ByteString, ByteString> nextPair;

public SSTIterator(SstFileReaderIterator iterator, KVDecoder kvDecoder) {
this.iterator = iterator;
this.kvDecoder = kvDecoder;
this.iterator.seekToFirst();
this.nextPair = processNext();
}

@Override
public boolean hasNext() {
return nextPair != null;
}

@Override
public Pair<ByteString, ByteString> next() {
Pair<ByteString, ByteString> result = nextPair;
nextPair = processNext();
return result;
}

private Pair<ByteString, ByteString> processNext() {
if (iterator.isValid()) {
ByteString key = kvDecoder.decodeKey(iterator.key());
ByteString value = kvDecoder.decodeValue(iterator.value());
iterator.next();
if (key != null) {
return Pair.create(key, value);
} else {
return processNext();
}
} else {
return null;
}
}
}
Loading

0 comments on commit 20fbe69

Please sign in to comment.