Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[to #519] move br component from tikv java client to this repo #73

Merged
merged 2 commits into from
Mar 23, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# ignore maven generated files
dependency-reduced-pom.xml

#ignore idea configuration
.idea
*.iml
Expand Down Expand Up @@ -34,4 +37,4 @@ target
hs_err_pid*

# ignore Mac files
.DS_Store
.DS_Store
2 changes: 1 addition & 1 deletion online-bulk-load/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ mvn clean package -DskipTests -am -pl online-bulk-load
```
spark-submit \
--master local[*] \
--jars /path/to/tikv-client-java-3.2.0-SNAPSHOT.jar \
--jars /path/to/tikv-client-java-3.3.0-SNAPSHOT.jar \
--class org.tikv.bulkload.example.BulkLoadExample \
online-bulk-load/target/online-bulk-load-0.0.1-SNAPSHOT.jar \
<pdaddr> <key_prefix> <data_size> <partition_nums>
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@
<dependency>
<groupId>org.tikv</groupId>
<artifactId>tikv-client-java</artifactId>
<version>3.2.0-SNAPSHOT</version>
<version>3.3.0-SNAPSHOT</version>
</dependency>

<dependency>
Expand Down
2 changes: 1 addition & 1 deletion sst-data-source/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ br backup raw \
```
spark-submit \
--master local[*] \
--jars /path/to/tikv-client-java-3.2.0-SNAPSHOT.jar \
--jars /path/to/tikv-client-java-3.3.0-SNAPSHOT.jar \
--class org.tikv.datasources.sst.example.SSTDataSourceExample \
sst-data-source/target/sst-data-source-0.0.1-SNAPSHOT.jar \
hdfs:///path/to/sst/
Expand Down
36 changes: 36 additions & 0 deletions sst-data-source/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,18 @@
<packaging>jar</packaging>
<name>Spark SST Data Source</name>

<properties>
<rocksdb.version>6.22.1.1</rocksdb.version>
</properties>

<dependencies>
<dependency>
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
<version>${rocksdb.version}</version>
</dependency>
</dependencies>

<build>
<extensions>
<extension>
Expand All @@ -36,6 +48,30 @@
</extension>
</extensions>
<plugins>
<!--- Needs to shade Protobuf 3 since other projects might use other version -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
<artifactSet>
<includes>
<include>org.rocksdb:rocksdbjni</include>
</includes>
</artifactSet>
</configuration>
</execution>
</executions>
</plugin>
<!-- Scala Format Plug-in -->
<plugin>
<groupId>org.antipathy</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2022 TiKV Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.tikv.datasources.br;

import java.io.Serializable;
import org.rocksdb.Options;
import org.rocksdb.ReadOptions;
import org.tikv.common.exception.SSTDecodeException;
import org.tikv.kvproto.Brpb;

public class BackupDecoder implements Serializable {
private final Brpb.BackupMeta backupMeta;
private final boolean ttlEnabled;
private final KVDecoder kvDecoder;

public BackupDecoder(Brpb.BackupMeta backupMeta) throws SSTDecodeException {
this.backupMeta = backupMeta;
this.ttlEnabled = false;
this.kvDecoder = initKVDecoder();
}

public BackupDecoder(Brpb.BackupMeta backupMeta, boolean ttlEnabled) throws SSTDecodeException {
this.backupMeta = backupMeta;
this.ttlEnabled = ttlEnabled;
this.kvDecoder = initKVDecoder();
}

private KVDecoder initKVDecoder() throws SSTDecodeException {
if (backupMeta.getIsRawKv()) {
if ("V1".equals(backupMeta.getApiVersion().name())) {
return new RawKVDecoderV1(ttlEnabled);
} else {
throw new SSTDecodeException(
"does not support decode APIVersion " + backupMeta.getApiVersion().name());
}
} else {
throw new SSTDecodeException("TxnKV is not supported yet!");
}
}

public org.tikv.datasources.br.SSTDecoder decodeSST(String sstFilePath) {
return decodeSST(sstFilePath, new Options(), new ReadOptions());
}

public org.tikv.datasources.br.SSTDecoder decodeSST(String sstFilePath, Options options, ReadOptions readOptions) {
return new SSTDecoder(sstFilePath, kvDecoder, options, readOptions);
}

public Brpb.BackupMeta getBackupMeta() {
return backupMeta;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2022 TiKV Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.tikv.datasources.br;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import org.tikv.kvproto.Brpb;

public class BackupMetaDecoder {
private final Brpb.BackupMeta backupMeta;

public BackupMetaDecoder(byte[] data) throws org.tikv.shade.com.google.protobuf.InvalidProtocolBufferException {
this.backupMeta = Brpb.BackupMeta.parseFrom(data);
}

public Brpb.BackupMeta getBackupMeta() {
return backupMeta;
}

public static BackupMetaDecoder parse(String backupMetaFilePath) throws IOException {
byte[] data = Files.readAllBytes(new File(backupMetaFilePath).toPath());
return new BackupMetaDecoder(data);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2022 TiKV Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.tikv.datasources.br;

import com.google.protobuf.ByteString;
import java.io.Serializable;

public interface KVDecoder extends Serializable {
ByteString decodeKey(byte[] key);

ByteString decodeValue(byte[] value);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2022 TiKV Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.tikv.datasources.br;

import com.google.protobuf.ByteString;
import java.util.Arrays;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RawKVDecoderV1 implements KVDecoder {
private static final Logger logger = LoggerFactory.getLogger(SSTIterator.class);

private final boolean ttlEnabled;

public RawKVDecoderV1(boolean ttlEnabled) {
this.ttlEnabled = ttlEnabled;
}

@Override
public ByteString decodeKey(byte[] key) {
if (key == null || key.length == 0) {
logger.warn(
"skip Key-Value pair because key == null || key.length == 0, key = "
+ Arrays.toString(key));
return null;
} else if (key[0] != 'z') {
logger.warn("skip Key-Value pair because key[0] != 'z', key = " + Arrays.toString(key));
return null;
}
return ByteString.copyFrom(key, 1, key.length - 1);
}

@Override
public ByteString decodeValue(byte[] value) {
if (!ttlEnabled) {
return ByteString.copyFrom(value);
} else {
return ByteString.copyFrom(value).substring(0, value.length - 8);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright 2022 TiKV Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.tikv.datasources.br;

import com.google.protobuf.ByteString;
import java.util.Iterator;
import org.rocksdb.Options;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDBException;
import org.rocksdb.SstFileReader;
import org.rocksdb.SstFileReaderIterator;
import org.tikv.common.util.Pair;

public class SSTDecoder {
private final String filePath;
private final KVDecoder kvDecoder;
private final Options options;
private final ReadOptions readOptions;

private SstFileReader sstFileReader;
private SstFileReaderIterator iterator;

public SSTDecoder(String sstFilePath, KVDecoder kvDecoder) {
this.filePath = sstFilePath;
this.kvDecoder = kvDecoder;
this.options = new Options();
this.readOptions = new ReadOptions();
}

public SSTDecoder(
String filePath, KVDecoder kvDecoder, Options options, ReadOptions readOptions) {
this.filePath = filePath;
this.kvDecoder = kvDecoder;
this.options = options;
this.readOptions = readOptions;
}

public synchronized Iterator<Pair<ByteString, ByteString>> getIterator() throws RocksDBException {
if (sstFileReader != null || iterator != null) {
throw new RocksDBException("File already opened!");
}

sstFileReader = new SstFileReader(new Options());
sstFileReader.open(filePath);
iterator = sstFileReader.newIterator(new ReadOptions());
return new SSTIterator(iterator, kvDecoder);
}

public synchronized void close() {
try {
if (iterator != null) {
iterator.close();
}
} finally {
iterator = null;
}

try {
if (sstFileReader != null) {
sstFileReader.close();
}
} finally {
sstFileReader = null;
}
}

public String getFilePath() {
return filePath;
}

public Options getOptions() {
return options;
}

public ReadOptions getReadOptions() {
return readOptions;
}
}
Loading