diff --git a/.gitignore b/.gitignore index f6c0392d9e..aa65bacfc8 100644 --- a/.gitignore +++ b/.gitignore @@ -22,3 +22,4 @@ output* # ignore standard Mac OS X files/dirs .DS_Store +/differentbin/ diff --git a/bin/bindings.properties b/bin/bindings.properties old mode 100644 new mode 100755 index ec6262b441..f84c4d3a7a --- a/bin/bindings.properties +++ b/bin/bindings.properties @@ -79,3 +79,4 @@ solr:com.yahoo.ycsb.db.solr.SolrClient solr6:com.yahoo.ycsb.db.solr6.SolrClient tarantool:com.yahoo.ycsb.db.TarantoolClient tablestore:com.yahoo.ycsb.db.tablestore.TableStoreClient +voltdb:com.yahoo.ycsb.db.voltdb.VoltClient4 diff --git a/bin/ycsb.bat b/bin/ycsb.bat old mode 100644 new mode 100755 diff --git a/distribution/pom.xml b/distribution/pom.xml index 816476ff72..fa45ca56b4 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -286,6 +286,12 @@ LICENSE file. ${project.version} --> + + com.yahoo.ycsb + voltdb-binding + ${project.version} + + diff --git a/pom.xml b/pom.xml index 1c83e87057..9019761bb4 100644 --- a/pom.xml +++ b/pom.xml @@ -110,6 +110,7 @@ LICENSE file. 0.8.0 0.81 4.8.0 + 9.1.1 @@ -168,6 +169,7 @@ LICENSE file. tarantool tablestore + voltdb diff --git a/voltdb/README.md b/voltdb/README.md new file mode 100644 index 0000000000..afab79cf46 --- /dev/null +++ b/voltdb/README.md @@ -0,0 +1,86 @@ + +# YCSB for VoltDB + +![VoltDB Logo](https://www.voltdb.com/wp-content/uploads/2016/10/VoltDB-logo-320x132.jpg?ycsb=true "VoltDB") + +## README + +This README describes how to run YCSB on VoltDB. + +Here at VoltDB we use 4 machines for testing - 1 client and a 3 node cluster. + +### 1. Install Java on all the machines involved. + +VoltDB uses Java. Either [Oracle Java](https://www.oracle.com/technetwork/java/javase/downloads/index.html) or [OpenJDK](https://openjdk.java.net/) will work. + + +### 3. Install and configure VoltDB + +If you don't already have a copy of VoltDB you should [download](https://www.voltdb.com/try-voltdb/) and install it. +Make sure you know the hostnames/ip addresses of all the nodes in the cluster and that [port 21212](https://docs.voltdb.com/AdminGuide/HostConfigPortOpts.php) is open for your client. + +A representative VoltDB cluster would have 3 nodes and a '[K factor](https://docs.voltdb.com/UsingVoltDB/KSafeEnable.php)' of 1. This configuration allows work to continue if a server dies. + +Note: If you contact us we can give you access to AWS Cloudformation scripts and a matching AMI to create a cluster for you. + +### 4. Install YCSB + +Download the [latest YCSB](https://github.com/brianfrankcooper/YCSB/releases/latest) file. Follow the instructions. + +### 5. Configure VoltDB parameters + +Create a file called (for example) voltdb.properties: + + recordcount=10000000 + operationcount=10000000 + voltdb.servers=localhost + threadcount=1 + maxexecutiontime=300 + +Other possible entries would be: + +- `voltdb.user` + - Username. Only needed if username/passwords enabled. +- `voltdb.password` + - Password. Only needed if username/passwords enabled. +- `voltdb.ratelimit` + - Maximum number of transactions allowed per second per 50 threads - e.g. 'voltdb.ratelimit=70000'. Note that as you increase the workload you eventually get to a point where throwing more and more transactions at a given configuration is counterproductive. For the three node configuration we mentioned above 70000 would be a good starting point for this value. +- `voltdb.scanall` + - When set to 'yes' uses a single query to return data for 'Scan' operations ('workload e') instead of a separate query per partition. The later is much more scalable but generates more network traffic. + +### 6. Run YCSB + +See: [Running a Workload](https://github.com/brianfrankcooper/YCSB/wiki/Running-a-Workload) + +Make sure you keep an eye on the voltdb GUI. If you installed VoltDB on the same computer you are running YCSB on it will be at: + + http://localhost:8080/ + +First load the data: + + bin/ycsb.sh load voltdb -P workloads/workloada -P voltdb.properties + +Then run the different workloads - 'a' through 'e': + + bin/ycsb.sh run voltdb -P workloads/workloada -P voltdb.properties + bin/ycsb.sh run voltdb -P workloads/workloadb -P voltdb.properties + bin/ycsb.sh run voltdb -P workloads/workloadc -P voltdb.properties + bin/ycsb.sh run voltdb -P workloads/workloadd -P voltdb.properties + bin/ycsb.sh run voltdb -P workloads/workloade -P voltdb.properties + \ No newline at end of file diff --git a/voltdb/pom.xml b/voltdb/pom.xml new file mode 100644 index 0000000000..0ddd0a6691 --- /dev/null +++ b/voltdb/pom.xml @@ -0,0 +1,74 @@ + + + + + 4.0.0 + + com.yahoo.ycsb + binding-parent + 0.17.0-SNAPSHOT + ../binding-parent + + + voltdb-binding + VoltDB Binding + jar + + + com.yahoo.ycsb + core + ${project.version} + provided + + + junit + junit + 4.12 + test + + + org.slf4j + slf4j-api + 1.7.28 + + + org.apache.logging.log4j + log4j-api + 2.7 + + + org.apache.logging.log4j + log4j-core + 2.7 + + + org.apache.logging.log4j + log4j-slf4j-impl + 2.7 + + + + org.voltdb + voltdbclient + 9.1.1 + + + + org.voltdb + voltdb + 9.1.1 + provided + + + + diff --git a/voltdb/src/main/java/com/yahoo/ycsb/db/voltdb/ConnectionHelper.java b/voltdb/src/main/java/com/yahoo/ycsb/db/voltdb/ConnectionHelper.java new file mode 100644 index 0000000000..6d56b0038c --- /dev/null +++ b/voltdb/src/main/java/com/yahoo/ycsb/db/voltdb/ConnectionHelper.java @@ -0,0 +1,179 @@ +/** + * Copyright (c) 2015-2019 YCSB contributors. All rights reserved. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +/* + * VoltDB Connection Utility. + */ +package com.yahoo.ycsb.db.voltdb; + +import java.io.IOException; +import java.net.Socket; +import java.util.concurrent.CountDownLatch; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.voltdb.client.Client; +import org.voltdb.client.ClientConfig; +import org.voltdb.client.ClientFactory; + +/** + * Help class to create VoltDB connections for YCSB benchmark. + */ +public final class ConnectionHelper { + + /** + * Default port for VoltDB. + */ + private static final int VOLTDB_DEFAULT_PORT = 21212; + + /** + * hidden constructor. + */ + private ConnectionHelper() { + + } + + /** + * Creates a factory used to connect to a VoltDB instance. (Note that if a + * corresponding connection exists, all parameters other than 'servers' are + * ignored) + * + * @param servers The comma separated list of VoltDB servers in + * hostname[:port] format that the instance will use. + * @param user The username for the connection + * @param password The password for the specified user + * @param ratelimit A limit on the number of transactions per second for the + * VoltDB instance + * @return The existing factory if a corresponding connection has already been + * created; the newly created one otherwise. + * @throws IOException Throws if a connection is already open with a + * different server string. + * @throws InterruptedException + */ + public static Client createConnection(String servers, String user, String password, + int ratelimit) throws IOException, InterruptedException { + + ClientConfig config = new ClientConfig(user, password); + config.setMaxTransactionsPerSecond(ratelimit); + Client client = ClientFactory.createClient(config); + + // Note that in VoltDB there is a distinction between creating an instance of a client + // and actually connecting to the DB... + connect(client, servers); + + return client; + } + + + /** + * Connect to a single server with retry. Limited exponential backoff. No + * timeout. This will run until the process is killed if it's not able to + * connect. + * + * @param server hostname:port or just hostname (hostname can be ip). + */ + private static void connectToOneServerWithRetry(final Client client, String server) { + + Logger logger = LoggerFactory.getLogger(ConnectionHelper.class); + + int sleep = 1000; + while (true) { + try { + client.createConnection(server); + break; + } catch (Exception e) { + logger.error("Connection failed - retrying in %d second(s).\n", sleep / 1000); + try { + Thread.sleep(sleep); + } catch (java.lang.InterruptedException e2) { + logger.error(e2.getMessage()); + } + if (sleep < 8000) { + sleep += sleep; + } + } + } + + logger.info("Connected to VoltDB node at:" + server); + } + + /** + * See if DB servers are present on the network. + * + * @return true or false + */ + public static boolean checkDBServers(String servernames) { + + String[] serverNamesArray = servernames.split(","); + + boolean dbThere = false; + + Socket socket = null; + try { + // Connect + socket = new Socket(serverNamesArray[0], VOLTDB_DEFAULT_PORT); + dbThere = true; + } catch (IOException connectFailed) { + dbThere = false; + } finally { + if (socket != null) { + try { + socket.close(); + } catch (IOException ignore) { + // Ignore. + } + } + socket = null; + } + + return dbThere; + + } + + /** + * Connect to a set of servers in parallel. Each will retry until connection. + * This call will block until all have connected. + * + * @param servers A comma separated list of servers using the hostname:port + * syntax (where :port is optional). + * @throws InterruptedException if anything bad happens with the threads. + */ + private static void connect(final Client client, String servers) throws InterruptedException { + + Logger logger = LoggerFactory.getLogger(ConnectionHelper.class); + + logger.info("Connecting to VoltDB..."); + + String[] serverArray = servers.split(","); + final CountDownLatch connections = new CountDownLatch(serverArray.length); + + // use a new thread to connect to each server + for (final String server : serverArray) { + new Thread(new Runnable() { + @Override + public void run() { + connectToOneServerWithRetry(client, server); + connections.countDown(); + } + }).start(); + } + // block until all have connected + connections.await(); + } + + +} diff --git a/voltdb/src/main/java/com/yahoo/ycsb/db/voltdb/VoltClient4.java b/voltdb/src/main/java/com/yahoo/ycsb/db/voltdb/VoltClient4.java new file mode 100644 index 0000000000..0d35357e61 --- /dev/null +++ b/voltdb/src/main/java/com/yahoo/ycsb/db/voltdb/VoltClient4.java @@ -0,0 +1,304 @@ +/** + * Copyright (c) 2015-2019 YCSB contributors. All rights reserved. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +/* + * This client provideds a wrapper layer for running the Yahoo Cloud Serving + * Benchmark (YCSB) against VoltDB. This benchmark runs a synchronous client + * with a mix of the operations provided below. YCSB is open-source, and may + * be found at https://github.com/brianfrankcooper/YCSB. The YCSB jar must be + * in your classpath to compile this client. + */ +package com.yahoo.ycsb.db.voltdb; + +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.Vector; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.voltdb.VoltTable; +import org.voltdb.client.Client; +import org.voltdb.client.ClientResponse; +import org.voltdb.client.ClientResponseWithPartitionKey; +import org.voltdb.client.NoConnectionsException; + +import com.yahoo.ycsb.ByteArrayByteIterator; +import com.yahoo.ycsb.ByteIterator; +import com.yahoo.ycsb.DB; +import com.yahoo.ycsb.DBException; +import com.yahoo.ycsb.Status; +import com.yahoo.ycsb.db.voltdb.sortedvolttable.VoltDBTableSortedMergeWrangler; + +/** + * A client that can be used by YCSB to work with VoltDB. + */ +public class VoltClient4 extends DB { + + private Client mclient; + private byte[] mworkingData; + private ByteBuffer mwriteBuf; + private boolean useScanAll = false; + private static final Charset UTF8 = Charset.forName("UTF-8"); + + private Logger logger = LoggerFactory.getLogger(VoltClient4.class); + + private YCSBSchemaBuilder ysb = null; + + @Override + public void init() throws DBException { + Properties props = getProperties(); + String servers = props.getProperty("voltdb.servers", "localhost"); + String user = props.getProperty("voltdb.user", ""); + String password = props.getProperty("voltdb.password", ""); + String strLimit = props.getProperty("voltdb.ratelimit"); + String useScanAllParam = props.getProperty("voltdb.scanall", "no"); + + if (useScanAllParam.equalsIgnoreCase("YES")) { + useScanAll = true; + } + + int ratelimit = strLimit != null ? Integer.parseInt(strLimit) : Integer.MAX_VALUE; + try { + mclient = ConnectionHelper.createConnection(servers, user, password, ratelimit); + + ysb = StaticHolder.INSTANCE; + ysb.loadClassesAndDDLIfNeeded(mclient); + + } catch (Exception e) { + logger.error("Error while creating connection: ", e); + throw new DBException(e.getMessage()); + } + mworkingData = new byte[1024 * 1024]; + mwriteBuf = ByteBuffer.wrap(mworkingData); + } + + /** + * @return true if we have a live DB connection + */ + public boolean hasConnection() { + + if (mclient != null && mclient.getConnectedHostList().size() > 0) { + return true; + } + + return false; + + } + + @Override + public void cleanup() throws DBException { + + // If VoltDB client exists and has a live connection... + if (mclient != null && mclient.getConnectedHostList().size() > 0) { + + try { + mclient.drain(); + mclient.close(); + } catch (NoConnectionsException e) { + logger.error(e.getMessage(), e); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); + } + + mclient = null; + } + + + } + + @Override + public Status delete(String keyspace, String key) { + try { + ClientResponse response = mclient.callProcedure("STORE.delete", key, keyspace.getBytes(UTF8)); + return response.getStatus() == ClientResponse.SUCCESS ? Status.OK : Status.ERROR; + } catch (Exception e) { + logger.error("Error while deleting row", e); + return Status.ERROR; + } + } + + @Override + public Status insert(String keyspace, String key, Map columns) { + return update(keyspace, key, columns); + } + + @Override + public Status read(String keyspace, String key, Set columns, Map result) { + try { + ClientResponse response = mclient.callProcedure("Get", keyspace.getBytes(UTF8), key); + if (response.getStatus() != ClientResponse.SUCCESS) { + return Status.ERROR; + } + VoltTable table = response.getResults()[0]; + if (table.advanceRow()) { + unpackRowData(table, columns, result); + } + return Status.OK; + } catch (Exception e) { + logger.error("Error while GETing row", e); + return Status.ERROR; + } + } + + @Override + public Status scan(String keyspace, String lowerBound, int recordCount, Set columns, + Vector> result) { + + try { + + if (useScanAll) { + + byte[] ks = keyspace.getBytes(UTF8); + ClientResponse response = mclient.callProcedure("ScanAll", ks, lowerBound.getBytes(UTF8), recordCount); + + if (response.getStatus() != ClientResponse.SUCCESS) { + return Status.ERROR; + } + + result.ensureCapacity(recordCount); + + VoltTable outputTable = response.getResults()[0]; + outputTable.resetRowPosition(); + while (outputTable.advanceRow()) { + result.add(unpackRowDataHashMap(outputTable, columns)); + } + + } else { + + byte[] ks = keyspace.getBytes(UTF8); + ClientResponseWithPartitionKey[] response = mclient.callAllPartitionProcedure("Scan", ks, + lowerBound.getBytes(UTF8), recordCount); + + for (int i = 0; i < response.length; i++) { + if (response[i].response.getStatus() != ClientResponse.SUCCESS) { + return Status.ERROR; + } + + } + + result.ensureCapacity(recordCount); + + VoltDBTableSortedMergeWrangler smw = new VoltDBTableSortedMergeWrangler(response); + + VoltTable outputTable = smw.getSortedTable(1, recordCount); + outputTable.resetRowPosition(); + while (outputTable.advanceRow()) { + result.add(unpackRowDataHashMap(outputTable, columns)); + } + + } + + return Status.OK; + } catch (Exception e) { + logger.error("Error while calling SCAN", e); + return Status.ERROR; + } + } + + @Override + public Status update(String keyspace, String key, Map columns) { + try { + ClientResponse response = mclient.callProcedure("Put", keyspace.getBytes(UTF8), key, packRowData(columns)); + return response.getStatus() == ClientResponse.SUCCESS ? Status.OK : Status.ERROR; + } catch (Exception e) { + logger.error("Error while calling Update", e); + return Status.ERROR; + } + } + + private byte[] packRowData(Map columns) { + mwriteBuf.clear(); + mwriteBuf.putInt(columns.size()); + for (String key : columns.keySet()) { + byte[] k = key.getBytes(UTF8); + mwriteBuf.putInt(k.length); + mwriteBuf.put(k); + + ByteIterator v = columns.get(key); + int len = (int) v.bytesLeft(); + mwriteBuf.putInt(len); + v.nextBuf(mworkingData, mwriteBuf.position()); + mwriteBuf.position(mwriteBuf.position() + len); + } + + byte[] data = new byte[mwriteBuf.position()]; + System.arraycopy(mworkingData, 0, data, 0, data.length); + return data; + } + + private Map unpackRowData(VoltTable data, Set fields, + Map result) { + byte[] rowData = data.getVarbinary(0); + ByteBuffer buf = ByteBuffer.wrap(rowData); + int nFields = buf.getInt(); + return unpackRowData(rowData, buf, nFields, fields, result); + } + + private Map unpackRowData(byte[] rowData, ByteBuffer buf, int nFields, Set fields, + Map result) { + for (int i = 0; i < nFields; i++) { + int len = buf.getInt(); + int off = buf.position(); + String key = new String(rowData, off, len, UTF8); + buf.position(off + len); + len = buf.getInt(); + off = buf.position(); + if (fields == null || fields.contains(key)) { + result.put(key, new ByteArrayByteIterator(rowData, off, len)); + } + buf.position(off + len); + } + return result; + } + + private HashMap unpackRowDataHashMap(VoltTable data, Set fields) { + byte[] rowData = data.getVarbinary(0); + ByteBuffer buf = ByteBuffer.wrap(rowData); + int nFields = buf.getInt(); + int size = fields != null ? Math.min(fields.size(), nFields) : nFields; + HashMap res = new HashMap(size, (float) 1.25); + return unpackRowDataHashMap(rowData, buf, nFields, fields, res); + } + + private HashMap unpackRowDataHashMap(byte[] rowData, ByteBuffer buf, int nFields, + Set fields, HashMap result) { + for (int i = 0; i < nFields; i++) { + int len = buf.getInt(); + int off = buf.position(); + String key = new String(rowData, off, len, UTF8); + buf.position(off + len); + len = buf.getInt(); + off = buf.position(); + if (fields == null || fields.contains(key)) { + result.put(key, new ByteArrayByteIterator(rowData, off, len)); + } + buf.position(off + len); + } + return result; + } + + private static class StaticHolder { + static final YCSBSchemaBuilder INSTANCE = new YCSBSchemaBuilder(); + } + + + +} diff --git a/voltdb/src/main/java/com/yahoo/ycsb/db/voltdb/YCSBSchemaBuilder.java b/voltdb/src/main/java/com/yahoo/ycsb/db/voltdb/YCSBSchemaBuilder.java new file mode 100644 index 0000000000..94eb79379e --- /dev/null +++ b/voltdb/src/main/java/com/yahoo/ycsb/db/voltdb/YCSBSchemaBuilder.java @@ -0,0 +1,232 @@ +/** + * Copyright (c) 2015-2019 YCSB contributors. All rights reserved. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ +package com.yahoo.ycsb.db.voltdb; + +import java.io.BufferedInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.nio.file.Files; +import java.util.jar.Attributes; +import java.util.jar.JarEntry; +import java.util.jar.JarOutputStream; +import java.util.jar.Manifest; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.voltdb.client.Client; +import org.voltdb.client.ClientResponse; +import org.voltdb.client.ProcCallException; + +/** + * Utility class to build the YCSB schema. + * + */ +public final class YCSBSchemaBuilder { + + private static final String PROCEDURE_GET_WAS_NOT_FOUND = "Procedure Get was not found"; + + private static final Charset UTF8 = Charset.forName("UTF-8"); + + private final String createTableDDL = "CREATE TABLE Store (keyspace VARBINARY(128) NOT NULL\n" + + ", key VARCHAR(128) NOT NULL, value VARBINARY(2056) NOT NULL\n" + + ", PRIMARY KEY (key, keyspace));"; + + private final String partitionTableDDL = "PARTITION TABLE Store ON COLUMN key;\n"; + + private final String createGetDDL = "CREATE PROCEDURE Get PARTITION ON TABLE Store COLUMN key PARAMETER 1\n" + + "AS SELECT value FROM Store WHERE keyspace = ? AND key = ?;"; + + private final String createPutDDL = "CREATE PROCEDURE PARTITION ON TABLE Store COLUMN key PARAMETER 1\n" + + "FROM CLASS com.yahoo.ycsb.db.voltdb.procs.Put;"; + + private final String createScanDDL = "CREATE PROCEDURE PARTITION ON TABLE Store COLUMN key \n" + + "FROM CLASS com.yahoo.ycsb.db.voltdb.procs.Scan;"; + + private final String createScanAllDDL = "CREATE PROCEDURE \n" + "FROM CLASS com.yahoo.ycsb.db.voltdb.procs.ScanAll;"; + + private final String[] ddlStatements = {createTableDDL, partitionTableDDL }; + + private final String[] procStatements = {createGetDDL, createPutDDL, createScanDDL, createScanAllDDL }; + + private final String[] jarFiles = {"Put.class", "Scan.class", "ScanAll.class", "ByteWrapper.class" }; + + private final String jarFileName = "ycsb-procs.jar"; + + private Logger logger = LoggerFactory.getLogger(YCSBSchemaBuilder.class); + + /** + * Utility class to build the YCSB schema. + * + * @author srmadscience / VoltDB + * + */ + YCSBSchemaBuilder() { + super(); + } + + /** + * See if we think YCSB Schema already exists... + * + * @return true if the 'Get' procedure exists and takes one string as a + * parameter. + */ + public boolean schemaExists(Client voltClient) { + + final String testString = "Test"; + boolean schemaExists = false; + + try { + ClientResponse response = voltClient.callProcedure("Get", testString.getBytes(UTF8), testString); + + if (response.getStatus() == ClientResponse.SUCCESS) { + // YCSB Database exists... + schemaExists = true; + } else { + // If we'd connected to a copy of VoltDB without the schema and tried to call Get + // we'd have got a ProcCallException + logger.error("Error while calling schemaExists(): " + response.getStatusString()); + schemaExists = false; + } + } catch (ProcCallException pce) { + schemaExists = false; + + // Sanity check: Make sure we've got the *right* ProcCallException... + if (!pce.getMessage().equals(PROCEDURE_GET_WAS_NOT_FOUND)) { + logger.error("Got unexpected Exception while calling schemaExists()", pce); + } + + } catch (Exception e) { + logger.error("Error while creating classes.", e); + schemaExists = false; + } + + return schemaExists; + } + + /** + * Load classes and DDL required by YCSB. + * + * @throws Exception + */ + public synchronized void loadClassesAndDDLIfNeeded(Client voltClient) throws Exception { + + if (schemaExists(voltClient)) { + return; + } + + File tempDir = Files.createTempDirectory("voltdbYCSB").toFile(); + + if (!tempDir.canWrite()) { + throw new Exception("Temp Directory (from Files.createTempDirectory()) '" + + tempDir.getAbsolutePath() + "' is not writable"); + } + + ClientResponse cr; + + for (int i = 0; i < ddlStatements.length; i++) { + try { + cr = voltClient.callProcedure("@AdHoc", ddlStatements[i]); + if (cr.getStatus() != ClientResponse.SUCCESS) { + throw new Exception("Attempt to execute '" + ddlStatements[i] + "' failed:" + cr.getStatusString()); + } + logger.info(ddlStatements[i]); + } catch (Exception e) { + + if (e.getMessage().indexOf("object name already exists") > -1) { + // Someone else has done this... + return; + } + + throw (e); + } + } + + logger.info("Creating JAR file in " + tempDir + File.separator + jarFileName); + Manifest manifest = new Manifest(); + manifest.getMainAttributes().put(Attributes.Name.MANIFEST_VERSION, "1.0"); + JarOutputStream newJarFile = new JarOutputStream(new FileOutputStream(tempDir + File.separator + jarFileName), + manifest); + + for (int i = 0; i < jarFiles.length; i++) { + InputStream is = getClass().getResourceAsStream("/com/yahoo/ycsb/db/voltdb/procs/" + jarFiles[i]); + add("com/yahoo/ycsb/db/voltdb/procs/" + jarFiles[i], is, newJarFile); + } + + newJarFile.close(); + File file = new File(tempDir + File.separator + jarFileName); + + byte[] jarFileContents = new byte[(int) file.length()]; + FileInputStream fis = new FileInputStream(file); + fis.read(jarFileContents); + fis.close(); + logger.info("Calling @UpdateClasses to load JAR file containing procedures"); + + cr = voltClient.callProcedure("@UpdateClasses", jarFileContents, null); + if (cr.getStatus() != ClientResponse.SUCCESS) { + throw new Exception("Attempt to execute UpdateClasses failed:" + cr.getStatusString()); + } + + for (int i = 0; i < procStatements.length; i++) { + logger.info(procStatements[i]); + cr = voltClient.callProcedure("@AdHoc", procStatements[i]); + if (cr.getStatus() != ClientResponse.SUCCESS) { + throw new Exception("Attempt to execute '" + procStatements[i] + "' failed:" + cr.getStatusString()); + } + } + + } + + /** + * Add an entry to our JAR file. + * + * @param fileName + * @param source + * @param target + * @throws IOException + */ + private void add(String fileName, InputStream source, JarOutputStream target) throws IOException { + BufferedInputStream in = null; + try { + + JarEntry entry = new JarEntry(fileName.replace("\\", "/")); + entry.setTime(System.currentTimeMillis()); + target.putNextEntry(entry); + in = new BufferedInputStream(source); + + byte[] buffer = new byte[1024]; + while (true) { + int count = in.read(buffer); + if (count == -1) { + break; + } + + target.write(buffer, 0, count); + } + target.closeEntry(); + } finally { + if (in != null) { + in.close(); + } + + } + } + +} diff --git a/voltdb/src/main/java/com/yahoo/ycsb/db/voltdb/package-info.java b/voltdb/src/main/java/com/yahoo/ycsb/db/voltdb/package-info.java new file mode 100644 index 0000000000..b92ce13d40 --- /dev/null +++ b/voltdb/src/main/java/com/yahoo/ycsb/db/voltdb/package-info.java @@ -0,0 +1,21 @@ +/** + * Copyright (c) 2015-2019 YCSB contributors. All rights reserved. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ +/** + * VoltDB integration with YCSB. + * + */ +package com.yahoo.ycsb.db.voltdb; diff --git a/voltdb/src/main/java/com/yahoo/ycsb/db/voltdb/procs/ByteWrapper.java b/voltdb/src/main/java/com/yahoo/ycsb/db/voltdb/procs/ByteWrapper.java new file mode 100644 index 0000000000..a59b815fbb --- /dev/null +++ b/voltdb/src/main/java/com/yahoo/ycsb/db/voltdb/procs/ByteWrapper.java @@ -0,0 +1,66 @@ +/** + * Copyright (c) 2015-2019 YCSB contributors. All rights reserved. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +package com.yahoo.ycsb.db.voltdb.procs; + +/** + * Utility class to map data structures used by YCSB to a VoltDB VARBINARY column. + */ +class ByteWrapper { + private byte[] marr; + private int moff; + private int mlen; + + ByteWrapper(byte[] arr, int off, int len) { + marr = arr; + moff = off; + mlen = len; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!(obj instanceof ByteWrapper)) { + return false; + } + ByteWrapper that = (ByteWrapper) obj; + if (this.mlen != that.mlen) { + return false; + } + for (int i = 0; i < this.mlen; i++) { + if (this.marr[this.moff + i] != that.marr[that.moff + i]) { + return false; + } + } + return true; + } + + @Override + public int hashCode() { + if (this.marr == null) { + return 0; + } + + int res = 1; + for (int i = 0; i < mlen; i++) { + res = 31 * res + marr[moff + i]; + } + return res; + } +} \ No newline at end of file diff --git a/voltdb/src/main/java/com/yahoo/ycsb/db/voltdb/procs/Put.java b/voltdb/src/main/java/com/yahoo/ycsb/db/voltdb/procs/Put.java new file mode 100644 index 0000000000..d31c145f3f --- /dev/null +++ b/voltdb/src/main/java/com/yahoo/ycsb/db/voltdb/procs/Put.java @@ -0,0 +1,103 @@ +/** + * Copyright (c) 2015-2019 YCSB contributors. All rights reserved. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +package com.yahoo.ycsb.db.voltdb.procs; + +import java.nio.ByteBuffer; +import java.util.HashSet; + +import org.voltdb.SQLStmt; +import org.voltdb.VoltProcedure; +import org.voltdb.VoltTable; + +/** + * + * Update a value in STORE. + * + */ +public class Put extends VoltProcedure { + private final SQLStmt selectStmt = new SQLStmt("SELECT value FROM Store WHERE keyspace = ? AND key = ?"); + private final SQLStmt insertStmt = new SQLStmt("INSERT INTO Store VALUES (?, ?, ?)"); + private final SQLStmt updateStmt = new SQLStmt("UPDATE Store SET value = ? WHERE keyspace = ? AND key = ?"); + + public long run(byte[] keyspace, String key, byte[] data) { + voltQueueSQL(selectStmt, keyspace, key); + VoltTable res = voltExecuteSQL()[0]; + if (res.advanceRow()) { + voltQueueSQL(updateStmt, merge(res.getVarbinary(0), data), keyspace, key); + } else { + voltQueueSQL(insertStmt, keyspace, key, data); + } + voltExecuteSQL(true); + return 0L; + } + + private byte[] merge(byte[] dest, byte[] src) { + HashSet mergeSet = new HashSet(); + ByteBuffer buf = ByteBuffer.wrap(src); + int nSrc = buf.getInt(); + for (int i = 0; i < nSrc; i++) { + int len = buf.getInt(); + int off = buf.position(); + mergeSet.add(new ByteWrapper(src, off, len)); + buf.position(off + len); + len = buf.getInt(); + buf.position(buf.position() + len); + } + + byte[] merged = new byte[src.length + dest.length]; + ByteBuffer out = ByteBuffer.wrap(merged); + + buf = ByteBuffer.wrap(dest); + int nDest = buf.getInt(); + int nFields = nSrc + nDest; + out.putInt(nFields); + + int blockStart = 4; + int blockEnd = 4; + for (int i = 0; i < nDest; i++) { + int len = buf.getInt(); + int off = buf.position(); + boolean flushBlock = mergeSet.contains(new ByteWrapper(dest, off, len)); + buf.position(off + len); + len = buf.getInt(); + buf.position(buf.position() + len); + if (flushBlock) { + if (blockStart < blockEnd) { + out.put(dest, blockStart, blockEnd - blockStart); + } + nFields--; + blockStart = buf.position(); + } + blockEnd = buf.position(); + } + if (blockStart < blockEnd) { + out.put(dest, blockStart, blockEnd - blockStart); + } + out.put(src, 4, src.length - 4); + + int length = out.position(); + if (nFields != nSrc + nDest) { + out.position(0); + out.putInt(nFields); + } + + byte[] res = new byte[length]; + System.arraycopy(merged, 0, res, 0, length); + return res; + } +} diff --git a/voltdb/src/main/java/com/yahoo/ycsb/db/voltdb/procs/Scan.java b/voltdb/src/main/java/com/yahoo/ycsb/db/voltdb/procs/Scan.java new file mode 100644 index 0000000000..65ba8c69ef --- /dev/null +++ b/voltdb/src/main/java/com/yahoo/ycsb/db/voltdb/procs/Scan.java @@ -0,0 +1,44 @@ +/** + * Copyright (c) 2015-2019 YCSB contributors. All rights reserved. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +package com.yahoo.ycsb.db.voltdb.procs; + +import org.voltdb.SQLStmt; +import org.voltdb.VoltProcedure; +import org.voltdb.VoltTable; + +/** + * + * Query STORE using a single partition query. + * + */ +public class Scan extends VoltProcedure { + + private final SQLStmt getBddStmt = new SQLStmt( + "SELECT value, key FROM Store WHERE keyspace = ? AND key >= ? ORDER BY key, keyspace LIMIT ?"); + private final SQLStmt getUnbddStmt = new SQLStmt( + "SELECT value, key FROM Store WHERE keyspace = ? ORDER BY key, keyspace LIMIT ?"); + + public VoltTable[] run(String partKey, byte[] keyspace, byte[] rangeMin, int count) throws Exception { + if (rangeMin != null) { + voltQueueSQL(getBddStmt, keyspace, new String(rangeMin, "UTF-8"), count); + } else { + voltQueueSQL(getUnbddStmt, keyspace, count); + } + return voltExecuteSQL(true); + } +} diff --git a/voltdb/src/main/java/com/yahoo/ycsb/db/voltdb/procs/ScanAll.java b/voltdb/src/main/java/com/yahoo/ycsb/db/voltdb/procs/ScanAll.java new file mode 100644 index 0000000000..5096182520 --- /dev/null +++ b/voltdb/src/main/java/com/yahoo/ycsb/db/voltdb/procs/ScanAll.java @@ -0,0 +1,44 @@ +/** + * Copyright (c) 2015-2019 YCSB contributors. All rights reserved. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +package com.yahoo.ycsb.db.voltdb.procs; + +import org.voltdb.SQLStmt; +import org.voltdb.VoltProcedure; +import org.voltdb.VoltTable; + +/** + * Query STORE using a multi partition query.. + * + */ +public class ScanAll extends VoltProcedure { + + private final SQLStmt getBddStmt = new SQLStmt( + "SELECT value, key FROM Store WHERE keyspace = ? AND key >= ? ORDER BY key, keyspace LIMIT ?"); + + private final SQLStmt getUnbddStmt = new SQLStmt( + "SELECT value, key FROM Store WHERE keyspace = ? ORDER BY key, keyspace LIMIT ?"); + + public VoltTable[] run(byte[] keyspace, byte[] rangeMin, int count) throws Exception { + if (rangeMin != null) { + voltQueueSQL(getBddStmt, keyspace, new String(rangeMin, "UTF-8"), count); + } else { + voltQueueSQL(getUnbddStmt, keyspace, count); + } + return voltExecuteSQL(true); + } +} diff --git a/voltdb/src/main/java/com/yahoo/ycsb/db/voltdb/procs/package-info.java b/voltdb/src/main/java/com/yahoo/ycsb/db/voltdb/procs/package-info.java new file mode 100644 index 0000000000..cd51b2d35d --- /dev/null +++ b/voltdb/src/main/java/com/yahoo/ycsb/db/voltdb/procs/package-info.java @@ -0,0 +1,25 @@ +/** + * Copyright (c) 2015-2019 YCSB contributors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +/** + * VoltDB com.yahoo.ycsb.db.voltdb.procs for Put, Scan and ScanAll. + * Other com.yahoo.ycsb.db.voltdb.procs are defined using DDL. + * + * ByteWrapper is a utility class, not a procedure. + */ +package com.yahoo.ycsb.db.voltdb.procs; + diff --git a/voltdb/src/main/java/com/yahoo/ycsb/db/voltdb/sortedvolttable/ClientResponseIsBadException.java b/voltdb/src/main/java/com/yahoo/ycsb/db/voltdb/sortedvolttable/ClientResponseIsBadException.java new file mode 100644 index 0000000000..d25100574d --- /dev/null +++ b/voltdb/src/main/java/com/yahoo/ycsb/db/voltdb/sortedvolttable/ClientResponseIsBadException.java @@ -0,0 +1,29 @@ +package com.yahoo.ycsb.db.voltdb.sortedvolttable; + +/** + * Copyright (c) 2015-2019 YCSB contributors. All rights reserved. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +/** + */ +@SuppressWarnings("serial") +public class ClientResponseIsBadException extends Exception { + + public ClientResponseIsBadException(String string) { + super(string); + } + +} diff --git a/voltdb/src/main/java/com/yahoo/ycsb/db/voltdb/sortedvolttable/IncomingVoltTablesNeedToBeSortedException.java b/voltdb/src/main/java/com/yahoo/ycsb/db/voltdb/sortedvolttable/IncomingVoltTablesNeedToBeSortedException.java new file mode 100644 index 0000000000..bc51467549 --- /dev/null +++ b/voltdb/src/main/java/com/yahoo/ycsb/db/voltdb/sortedvolttable/IncomingVoltTablesNeedToBeSortedException.java @@ -0,0 +1,29 @@ +package com.yahoo.ycsb.db.voltdb.sortedvolttable; + +/** + * Copyright (c) 2015-2019 YCSB contributors. All rights reserved. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +/** + */ +@SuppressWarnings("serial") +public class IncomingVoltTablesNeedToBeSortedException extends Exception { + + public IncomingVoltTablesNeedToBeSortedException(String string) { + super(string); + } + +} diff --git a/voltdb/src/main/java/com/yahoo/ycsb/db/voltdb/sortedvolttable/NeedsToBeComparableException.java b/voltdb/src/main/java/com/yahoo/ycsb/db/voltdb/sortedvolttable/NeedsToBeComparableException.java new file mode 100644 index 0000000000..cf7f7fc674 --- /dev/null +++ b/voltdb/src/main/java/com/yahoo/ycsb/db/voltdb/sortedvolttable/NeedsToBeComparableException.java @@ -0,0 +1,32 @@ +package com.yahoo.ycsb.db.voltdb.sortedvolttable; + +/** + * Copyright (c) 2015-2019 YCSB contributors. All rights reserved. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +/** + */ +@SuppressWarnings("serial") +public class NeedsToBeComparableException extends Exception { + + /** + * @param string + */ + public NeedsToBeComparableException(String string) { + super(string); + } + +} diff --git a/voltdb/src/main/java/com/yahoo/ycsb/db/voltdb/sortedvolttable/VoltDBTableSortedMergeWrangler.java b/voltdb/src/main/java/com/yahoo/ycsb/db/voltdb/sortedvolttable/VoltDBTableSortedMergeWrangler.java new file mode 100644 index 0000000000..c40c3aaaf4 --- /dev/null +++ b/voltdb/src/main/java/com/yahoo/ycsb/db/voltdb/sortedvolttable/VoltDBTableSortedMergeWrangler.java @@ -0,0 +1,205 @@ +package com.yahoo.ycsb.db.voltdb.sortedvolttable; + +/** + * Copyright (c) 2015-2019 YCSB contributors. All rights reserved. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +import org.voltdb.VoltTable; +import org.voltdb.VoltType; +import org.voltdb.client.ClientResponse; +import org.voltdb.client.ClientResponseWithPartitionKey; + +/** + * VoltDBTableSortedMergeWrangler allows you to merge an array of VoltTable + * provided by callAllPartitionProcedure. + * + * The intended use case is for when you need to issue a multi partition query + * but would prefer not to, as you don't need perfect read consistency and would + * rather get the individual VoltDB partitions to issue the query independently + * and then somehow merge the results. + * + */ +public class VoltDBTableSortedMergeWrangler { + + private ClientResponseWithPartitionKey[] theTables = null; + @SuppressWarnings("rawtypes") + private Comparable whatWeSelectedLastTime = null; + + public VoltDBTableSortedMergeWrangler(ClientResponseWithPartitionKey[] response) { + super(); + this.theTables = response; + } + + /** + * Takes 'theTables' and merges them based on column 'columnId'. We assume that + * column 'columnId' in each element of 'theTables' is correctly sorted within + * itself. + * + * @param columnid + * @param limit How many rows we want + * @return A new VoltTable. + * @throws NeedsToBeComparableException - if column columnId doesn't implement Comparable. + * @throws IncomingVoltTablesNeedToBeSortedException - incoming data isn't already sorted. + * @throws ClientResponseIsBadException - The procedure worked but is complaining. + */ + public VoltTable getSortedTable(int columnid, int limit) + throws NeedsToBeComparableException, IncomingVoltTablesNeedToBeSortedException, ClientResponseIsBadException { + + whatWeSelectedLastTime = null; + + // Create an empty output table + VoltTable outputTable = new VoltTable(theTables[0].response.getResults()[0].getTableSchema()); + + // make sure our input tables are usable, and ready to be read from the + // start + for (int i = 0; i < theTables.length; i++) { + VoltTable currentTable = theTables[i].response.getResults()[0]; + + if (theTables[i].response.getStatus() != ClientResponse.SUCCESS) { + throw new ClientResponseIsBadException(i + " " + theTables[i].response.getStatusString()); + } + + currentTable.resetRowPosition(); + currentTable.advanceRow(); + } + + // Find table with lowest value for columnId, which is supposed to be + // the sort key. + int lowestId = getLowestId(columnid); + + // Loop until we run out of data or get 'limit' rows. + while (lowestId > -1 && outputTable.getRowCount() < limit) { + + // having identified the lowest Table pull that row, add it to + // the output table, and then call 'advanceRow' so we can do this + // again... + VoltTable lowestTable = theTables[lowestId].response.getResults()[0]; + outputTable.add(lowestTable.cloneRow()); + lowestTable.advanceRow(); + + // Find table with lowest value for columnId + lowestId = getLowestId(columnid); + } + + return outputTable; + } + + /** + * This routine looks at column 'columnId' in an array of VoltTable and + * identifies which one is lowest. Note that as we call 'advanceRow' elsewhere + * this will change. + * + * @param columnid + * @return the VoltTable with the lowest value for column 'columnId'. or -1 if + * we've exhausted all the VoltTables. + * @throws NeedsToBeComparableException + * @throws IncomingVoltTablesNeedToBeSortedException + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + private int getLowestId(int columnid) throws NeedsToBeComparableException, IncomingVoltTablesNeedToBeSortedException { + + int lowestId = -1; + Comparable lowestObservedValue = null; + + for (int i = 0; i < theTables.length; i++) { + + VoltTable currentTable = theTables[i].response.getResults()[0]; + + int activeRowIndex = currentTable.getActiveRowIndex(); + int rowCount = currentTable.getRowCount(); + + if (activeRowIndex > -1 && activeRowIndex < rowCount) { + + if (lowestObservedValue == null) { + + lowestId = i; + lowestObservedValue = getComparable(currentTable, columnid); + + } else { + Comparable newObservedValue = getComparable(currentTable, columnid); + + if (newObservedValue.compareTo(lowestObservedValue) <= 0) { + lowestId = i; + + lowestObservedValue = getComparable(currentTable, columnid); + } + } + + } + } + + // If we found something make sure that the data in columnid was sorted + // properly when it was retrieved. + if (lowestId > -1) { + Comparable latestItemWeSelected = getComparable(theTables[lowestId].response.getResults()[0], columnid); + + if (whatWeSelectedLastTime != null && latestItemWeSelected.compareTo(whatWeSelectedLastTime) < 0) { + throw new IncomingVoltTablesNeedToBeSortedException( + "Latest Item '" + latestItemWeSelected + "' is before last item '" + whatWeSelectedLastTime + "'"); + } + + whatWeSelectedLastTime = latestItemWeSelected; + } + + return lowestId; + + } + + /** + * Get the value we're working with as a Comparable. + * + * @param theTable + * @param columnId + * @return a Comparable. + * @throws NeedsToBeComparableException + */ + @SuppressWarnings("rawtypes") + private Comparable getComparable(VoltTable theTable, int columnId) throws NeedsToBeComparableException { + Comparable c = null; + + VoltType vt = theTable.getColumnType(columnId); + Object theValue = theTable.get(columnId, vt); + + if (theValue instanceof Comparable) { + c = (Comparable) theValue; + } else { + throw new NeedsToBeComparableException( + theValue + ": Only Comparables are supported by VoltDBTableSortedMergeWrangler"); + } + + return c; + } + + /** + * Do a comparison of byte arrays. Not used right now, but will be when we added + * support for VARBINARY. + * + * @param left + * @param right + * @return whether 'left' is <, >, or = 'right' + */ + private int compare(byte[] left, byte[] right) { + for (int i = 0, j = 0; i < left.length && j < right.length; i++, j++) { + + int a = (left[i] & 0xff); + int b = (right[j] & 0xff); + if (a != b) { + return a - b; + } + } + return left.length - right.length; + } +} diff --git a/voltdb/src/main/java/com/yahoo/ycsb/db/voltdb/sortedvolttable/package-info.java b/voltdb/src/main/java/com/yahoo/ycsb/db/voltdb/sortedvolttable/package-info.java new file mode 100644 index 0000000000..8b93d6224e --- /dev/null +++ b/voltdb/src/main/java/com/yahoo/ycsb/db/voltdb/sortedvolttable/package-info.java @@ -0,0 +1,28 @@ +/** + * Copyright (c) 2015-2019 YCSB contributors. All rights reserved. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ +/** + * + * VoltDBTableSortedMergeWrangler allows you to merge an array of VoltTable + * provided by callAllPartitionProcedure. + * + * The intended use case is for when you need to issue a multi partition query + * but would prefer not to, as you don't need perfect read consistency and would + * rather get the individual VoltDB partitions to issue the query independently + * and then somehow merge the results. + * + */ +package com.yahoo.ycsb.db.voltdb.sortedvolttable; diff --git a/voltdb/src/test/java/com/yahoo/ycsb/db/voltdb/test/VoltDBClientTest.java b/voltdb/src/test/java/com/yahoo/ycsb/db/voltdb/test/VoltDBClientTest.java new file mode 100644 index 0000000000..1e91e615cf --- /dev/null +++ b/voltdb/src/test/java/com/yahoo/ycsb/db/voltdb/test/VoltDBClientTest.java @@ -0,0 +1,455 @@ +/** + * Copyright (c) 2015-2019 YCSB contributors. All rights reserved. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +package com.yahoo.ycsb.db.voltdb.test; + +import static org.junit.Assert.*; +import static org.junit.Assume.assumeNoException; + +import com.yahoo.ycsb.ByteIterator; +import com.yahoo.ycsb.DBException; +import com.yahoo.ycsb.Status; +import com.yahoo.ycsb.StringByteIterator; +import com.yahoo.ycsb.db.voltdb.ConnectionHelper; +import com.yahoo.ycsb.db.voltdb.VoltClient4; + +import org.junit.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; +import java.util.Vector; +import java.util.Properties; + +/** + * Test harness for YCSB / VoltDB. Note that not much happens if VoltDB isn't + * visible. + * + */ +public class VoltDBClientTest { + + private static final String TABLE_NAME = "USERTABLE"; + private static final int FIELD_LENGTH = 32; + private static final String FIELD_PREFIX = "FIELD"; + private static final int NUM_FIELDS = 3; + + private static final String INSERT_TEST_KEY = "InsertReadTest"; + private static final String INSERT_DELETE_AND_READ_TEST_KEY = "InsertDeleteReadTest"; + private static final String UPDATE_TEST_KEY = "UpdateTest"; + private static final String NON_EXISTENT_KEY = "NonExistTest"; + + private static final String SCAN_KEY_PREFIX = "ScanKey_"; + private static final int SCAN_RECORD_COUNT = 5000; + + private static final String[] TEST_DATA_KEYS = { INSERT_TEST_KEY, INSERT_DELETE_AND_READ_TEST_KEY, UPDATE_TEST_KEY }; + + private static VoltClient4 voltClient = null; + private static boolean haveDb = false; + + @BeforeClass + public static void setup() { + + Properties p = new Properties(); + + String servers = p.getProperty("voltdb.servers", "localhost"); + String user = p.getProperty("voltdb.user", ""); + String password = p.getProperty("voltdb.password", ""); + String strLimit = p.getProperty("voltdb.ratelimit", "70000"); + + p.setProperty("voltdb.servers", servers); + p.setProperty("voltdb.user", user); + p.setProperty("voltdb.password", password); + p.setProperty("voltdb.ratelimit", strLimit); + + try { + voltClient = new VoltClient4(); + voltClient.setProperties(p); + + if (ConnectionHelper.checkDBServers(servers)) { + voltClient.init(); + haveDb = true; + removeExistingData(); + } + + } catch (Exception e) { + // The call to checkDBServers above looks for activity on + // the ip and port we expect VoltDB to be on. If we get to this + // line it's because 'something' is running on localhost:21212, + // but whatever it is, it isn't a happy copy of VoltDB. + assumeNoException("Something was running on VoltDB's port but it wasn't a usable copy of VoltDB", e); + } + + } + + private static void removeExistingData() { + + try { + for (int i = 0; i < TEST_DATA_KEYS.length; i++) { + voltClient.delete(TABLE_NAME, TEST_DATA_KEYS[i]); + } + + for (int i = 0; i < SCAN_RECORD_COUNT; i++) { + voltClient.delete(TABLE_NAME, SCAN_KEY_PREFIX + i); + } + + } catch (Exception e) { + Logger logger = LoggerFactory.getLogger(VoltDBClientTest.class); + logger.error("Error while calling 'removeExistingData()'", e); + fail("Failed removeExistingData"); + } + } + + @AfterClass + public static void teardown() { + + try { + if (voltClient != null && haveDb) { + removeExistingData(); + voltClient.cleanup(); + } + } catch (DBException e) { + e.printStackTrace(); + } + + } + + @Before + public void prepareTest() { + } + + private boolean compareContents(HashMap inMsg, Map outMsg) { + + if (inMsg == null) { + return false; + } + + if (outMsg == null) { + return false; + } + + if (inMsg.size() != outMsg.size()) { + return false; + } + + @SuppressWarnings("rawtypes") + Iterator it = inMsg.entrySet().iterator(); + while (it.hasNext()) { + @SuppressWarnings("rawtypes") + Map.Entry pair = (Map.Entry) it.next(); + String key = (String) pair.getKey(); + ByteIterator inPayload = inMsg.get(key); + inPayload.reset(); + ByteIterator outPayload = outMsg.get(key); + outPayload.reset(); + + if (inPayload.bytesLeft() != outPayload.bytesLeft()) { + return false; + } + + while (inPayload.hasNext()) { + byte inByte = inPayload.nextByte(); + byte outByte = outPayload.nextByte(); + if (inByte != outByte) { + return false; + } + } + + it.remove(); + } + + return true; + + } + + @Test + public void insertAndReadTest() { + + Assume.assumeTrue(haveDb); + + try { + + // Create some test data + final String insertKey = INSERT_TEST_KEY; + final Set columns = getColumnNameMap(); + + // Insert row + HashMap insertMap = new HashMap(); + for (int i = 0; i < NUM_FIELDS; i++) { + insertMap.put(FIELD_PREFIX + i, new StringByteIterator(buildDeterministicValue(insertKey, FIELD_PREFIX + i))); + } + voltClient.insert(TABLE_NAME, insertKey, insertMap); + + // Create a object to put retrieved row in... + Map testResult = new HashMap(); + + // Read row... + Status s = voltClient.read(TABLE_NAME, insertKey, columns, testResult); + + if (!s.equals(Status.OK)) { + fail("Didn't get OK on read."); + } + + if (!compareContents(insertMap, testResult)) { + fail("Returned data not the same as inserted data"); + } + + } catch (Exception e) { + e.printStackTrace(); + fail("Failed insertTest"); + } + } + + @Test + public void insertDeleteAndReadTest() { + + Assume.assumeTrue(haveDb); + + try { + + // Create some test data + final String insertKey = INSERT_DELETE_AND_READ_TEST_KEY; + final Set columns = getColumnNameMap(); + + // Insert row + HashMap insertMap = new HashMap(); + for (int i = 0; i < NUM_FIELDS; i++) { + insertMap.put(FIELD_PREFIX + i, new StringByteIterator(buildDeterministicValue(insertKey, FIELD_PREFIX + i))); + } + voltClient.insert(TABLE_NAME, insertKey, insertMap); + + // Create a object to put retrieved row in... + Map testResult = new HashMap(); + + // Read row... + Status s = voltClient.read(TABLE_NAME, insertKey, columns, testResult); + + if (!s.equals(Status.OK)) { + fail("Didn't get OK on read."); + } + + if (!compareContents(insertMap, testResult)) { + fail("Returned data not the same as inserted data"); + } + + voltClient.delete(TABLE_NAME, insertKey); + + // Create another object to put retrieved row in... + Map testResultAfterDelete = new HashMap(); + + // Read row... + voltClient.read(TABLE_NAME, insertKey, columns, testResultAfterDelete); + + if (testResultAfterDelete.size() > 0) { + fail("testResultAfterDelete has value."); + } + + } catch (Exception e) { + e.printStackTrace(); + fail("Failed insertDeleteAndReadTest"); + } + } + + @Test + public void deleteNonExistentRecordTest() { + + Assume.assumeTrue(haveDb); + + try { + + // Create some test data + final String insertKey = NON_EXISTENT_KEY; + final Set columns = getColumnNameMap(); + + // Create a object to put retrieved row in... + Map testResult = new HashMap(); + + // Read row... + voltClient.read(TABLE_NAME, insertKey, columns, testResult); + + if (testResult.size() > 0) { + fail("testResult.size() > 0."); + } + + } catch (Exception e) { + e.printStackTrace(); + fail("Failed deleteNonExistentRecordTest"); + } + } + + @Test + public void scanReadTest() { + + Assume.assumeTrue(haveDb); + + try { + + for (int z = 0; z < SCAN_RECORD_COUNT; z++) { + // Create some test data + final String insertKey = SCAN_KEY_PREFIX + z; + + // Insert row + HashMap insertMap = new HashMap(); + for (int i = 0; i < NUM_FIELDS; i++) { + insertMap.put(FIELD_PREFIX + i, new StringByteIterator("Data for " + SCAN_KEY_PREFIX + z + " element " + i)); + } + voltClient.insert(TABLE_NAME, insertKey, insertMap); + } + + final String firstInsertKey = SCAN_KEY_PREFIX + 0; + final String lastInsertKey = SCAN_KEY_PREFIX + (SCAN_RECORD_COUNT - 1); + final String beyondLastInsertKey = SCAN_KEY_PREFIX + (SCAN_RECORD_COUNT + 1); + final String oneHundredFromEndInsertKey = SCAN_KEY_PREFIX + (SCAN_RECORD_COUNT - 101); + final String fiftyFromEndInsertKey = SCAN_KEY_PREFIX + (SCAN_RECORD_COUNT - 101); + + // test non existent records + singleScanReadTest(NON_EXISTENT_KEY, 1000, 0, NON_EXISTENT_KEY); + + // test single record + singleScanReadTest(firstInsertKey, 1, 1, firstInsertKey); + + // test scan of SCAN_RECORD_COUNT records + singleScanReadTest(firstInsertKey, SCAN_RECORD_COUNT, SCAN_RECORD_COUNT, lastInsertKey); + + // test single record in middle + singleScanReadTest(oneHundredFromEndInsertKey, 1, 1, oneHundredFromEndInsertKey); + + // test request of 100 starting 50 from end. + singleScanReadTest(fiftyFromEndInsertKey, 100, 50, lastInsertKey); + + // test request of 100 starting beyond the end + singleScanReadTest(beyondLastInsertKey, 100, 0, lastInsertKey); + + } catch (Exception e) { + e.printStackTrace(); + fail("Failed scanReadTest"); + } + + } + + private void singleScanReadTest(String startKey, int requestedCount, int expectedCount, String lastKey) { + + Assume.assumeTrue(haveDb); + + try { + + final Set columns = getColumnNameMap(); + + // Create a object to put retrieved row in... + Vector> testResult = new Vector>(); + + // Read row... + Status s = voltClient.scan(TABLE_NAME, startKey, expectedCount, columns, testResult); + + if (!s.equals(Status.OK)) { + fail("Didn't get OK on read."); + } + + if (testResult.size() != expectedCount) { + fail("Failed singleScanReadTest " + startKey + " " + expectedCount + " " + lastKey); + } + + } catch (Exception e) { + e.printStackTrace(); + fail("Failed singleScanReadTest " + startKey + ". Asked for " + requestedCount + ", expected " + expectedCount + + " lastkey=" + lastKey); + } + } + + @Test + public void updateTest() { + + Assume.assumeTrue(haveDb); + + try { + + // Create some test data + final String insertKey = UPDATE_TEST_KEY; + + // Insert row + // Insert row + HashMap insertThenUpdateMap = new HashMap(); + for (int i = 0; i < NUM_FIELDS; i++) { + insertThenUpdateMap.put(FIELD_PREFIX + i, + new StringByteIterator(buildDeterministicValue(insertKey, FIELD_PREFIX + i))); + } + voltClient.insert(TABLE_NAME, insertKey, insertThenUpdateMap); + + // Change the data we inserted... + for (int i = 0; i < NUM_FIELDS; i++) { + insertThenUpdateMap.put(FIELD_PREFIX + i, new StringByteIterator(FIELD_PREFIX + i + " has changed")); + } + + // now do an update + voltClient.update(TABLE_NAME, insertKey, insertThenUpdateMap); + + // Create a object to put retrieved row in... + final Set columns = getColumnNameMap(); + Map testResult = new HashMap(); + + // Read row... + Status s = voltClient.read(TABLE_NAME, insertKey, columns, testResult); + + if (!s.equals(Status.OK)) { + fail("Didn't get OK on read."); + } + + if (!compareContents(insertThenUpdateMap, testResult)) { + fail("Returned data not the same as inserted data"); + } + + } catch (Exception e) { + e.printStackTrace(); + fail("Failed updateTest"); + } + } + + /** + * @return + */ + private Set getColumnNameMap() { + Set columns = new HashSet(); + for (int i = 0; i < NUM_FIELDS; i++) { + columns.add(FIELD_PREFIX + i); + } + return columns; + } + + /* + * This is a copy of buildDeterministicValue() from + * core:com.yahoo.ycsb.workloads.CoreWorkload.java. That method is neither + * public nor static so we need a copy. + */ + private String buildDeterministicValue(String key, String fieldkey) { + int size = FIELD_LENGTH; + StringBuilder sb = new StringBuilder(size); + sb.append(key); + sb.append(':'); + sb.append(fieldkey); + while (sb.length() < size) { + sb.append(':'); + sb.append(sb.toString().hashCode()); + } + sb.setLength(size); + + return sb.toString(); + } + +} diff --git a/voltdb/src/test/java/com/yahoo/ycsb/db/voltdb/test/package-info.java b/voltdb/src/test/java/com/yahoo/ycsb/db/voltdb/test/package-info.java new file mode 100644 index 0000000000..066c9a7888 --- /dev/null +++ b/voltdb/src/test/java/com/yahoo/ycsb/db/voltdb/test/package-info.java @@ -0,0 +1,22 @@ +/** + * Copyright (c) 2015-2019 YCSB contributors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. See accompanying + * LICENSE file. + */ + +/** + * YCSB binding for VoltDB. + */ +package com.yahoo.ycsb.db.voltdb.test; +