From 2fbb6bc3ffc033d981af2f494a077a0cbb689a99 Mon Sep 17 00:00:00 2001 From: Sami Salonen Date: Sun, 28 Apr 2019 13:29:05 +0300 Subject: [PATCH 1/4] [dynamodb] latest changes synced from openhab1-addons repo In addition, removed dependency to com.google.common.collect. Corresponding PRs: #5847 and #5826 Signed-off-by: Sami Salonen --- .../README.md | 64 ++- .../AbstractBufferedPersistenceService.java | 103 +++++ .../dynamodb/internal/DynamoDBConfig.java | 37 +- .../internal/DynamoDBPersistenceService.java | 425 ++++++++++-------- .../dynamodb/internal/DynamoDBQueryUtils.java | 129 ++++++ ...AbstractDynamoDBItemSerializationTest.java | 44 +- .../internal/BaseIntegrationTest.java | 5 +- .../dynamodb/internal/DynamoDBConfigTest.java | 86 +++- 8 files changed, 655 insertions(+), 238 deletions(-) create mode 100644 bundles/org.openhab.persistence.dynamodb/src/main/java/org/openhab/persistence/dynamodb/internal/AbstractBufferedPersistenceService.java create mode 100644 bundles/org.openhab.persistence.dynamodb/src/main/java/org/openhab/persistence/dynamodb/internal/DynamoDBQueryUtils.java diff --git a/bundles/org.openhab.persistence.dynamodb/README.md b/bundles/org.openhab.persistence.dynamodb/README.md index faea9aa7d13ca..28caf973f9690 100644 --- a/bundles/org.openhab.persistence.dynamodb/README.md +++ b/bundles/org.openhab.persistence.dynamodb/README.md @@ -55,21 +55,21 @@ This service can be configured in the file `services/dynamodb.cfg`. ### Basic configuration -| Property | Default | Required | Description | -|----------|---------|:--------:|-------------| -| accessKey | | Yes | access key as shown in [Setting up Amazon account](#setting-up-amazon-account). | -| secretKey | | Yes | secret key as shown in [Setting up Amazon account](#setting-up-amazon-account). | -| region | | Yes | AWS region ID as described in [Setting up Amazon account](#setting-up-amazon-account). The region needs to match the region that was used to create the user. | +| Property | Default | Required | Description | +| --------- | ------- | :------: | ------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| accessKey | | Yes | access key as shown in [Setting up Amazon account](#setting-up-amazon-account). | +| secretKey | | Yes | secret key as shown in [Setting up Amazon account](#setting-up-amazon-account). | +| region | | Yes | AWS region ID as described in [Setting up Amazon account](#setting-up-amazon-account). The region needs to match the region that was used to create the user. | ### Configuration Using Credentials File Alternatively, instead of specifying `accessKey` and `secretKey`, one can configure a configuration profile file. -| Property | Default | Required | Description | -|----------|---------|:--------:|-------------| -| profilesConfigFile | | Yes | path to the credentials file. For example, `/etc/openhab2/aws_creds`. Please note that the user that runs openHAB must have approriate read rights to the credential file. For more details on the Amazon credential file format, see [Amazon documentation](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-getting-started.html). | -| profile | | Yes | name of the profile to use | -| region | | Yes | AWS region ID as described in Step 2 in [Setting up Amazon account](#setting-up-amazon-account). The region needs to match the region that was used to create the user. | +| Property | Default | Required | Description | +| ------------------ | ------- | :------: | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| profilesConfigFile | | Yes | path to the credentials file. For example, `/etc/openhab2/aws_creds`. Please note that the user that runs openHAB must have approriate read rights to the credential file. For more details on the Amazon credential file format, see [Amazon documentation](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-getting-started.html). | +| profile | | Yes | name of the profile to use | +| region | | Yes | AWS region ID as described in Step 2 in [Setting up Amazon account](#setting-up-amazon-account). The region needs to match the region that was used to create the user. | Example of service configuration file (`services/dynamodb.cfg`): @@ -91,11 +91,15 @@ aws_secret_access_key=testSecretKey In addition to the configuration properties above, the following are also available: -| Property | Default | Required | Description | -|----------|---------|:--------:|-------------| -| readCapacityUnits | 1 | No | read capacity for the created tables | -| writeCapacityUnits | 1 | No | write capacity for the created tables | -| tablePrefix | `openhab-` | No | table prefix used in the name of created tables | +| Property | Default | Required | Description | +| -------------------------- | ---------- | :------: | -------------------------------------------------------------------------------------------------- | +| readCapacityUnits | 1 | No | read capacity for the created tables | +| writeCapacityUnits | 1 | No | write capacity for the created tables | +| tablePrefix | `openhab-` | No | table prefix used in the name of created tables | +| bufferCommitIntervalMillis | 1000 | No | Interval to commit (write) buffered data. In milliseconds. | +| bufferSize | 1000 | No | Internal buffer size which is used to batch writes to DynamoDB every `bufferCommitIntervalMillis`. | + +Typically you should not need to modify parameters related to buffering. Refer to Amazon documentation on [provisioned throughput](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ProvisionedThroughput.html) for details on read/write capacity. @@ -105,10 +109,20 @@ All item- and event-related configuration is done in the file `persistence/dynam ### Tables Creation -When an item is persisted via this service, a table is created (if necessary). Currently, the service will create at most two tables for different item types. The tables will be named ``, where `` matches the `tablePrefix` configuration property; while the `` is either `bigdecimal` (numeric items) or `string` (string and complex items). +When an item is persisted via this service, a table is created (if necessary). Currently, the service will create at most two tables for different item types. The tables will be named ``, where the `` is either `bigdecimal` (numeric items) or `string` (string and complex items). Each table will have three columns: `itemname` (item name), `timeutc` (in ISO 8601 format with millisecond accuracy), and `itemstate` (either a number or string representing item state). +## Buffering + +By default, the service is asynchronous which means that data is not written immediately to DynamoDB but instead buffered in-memory. +The size of the buffer, in terms of datapoints, can be configured with `bufferSize`. +Every `bufferCommitIntervalMillis` the whole buffer of data is flushed to DynamoDB. + +It is recommended to have the buffering enabled since the synchronous behaviour (writing data immediately) might have adverse impact to the whole system when there is many items persisted at the same time. The buffering can be disabled by setting `bufferSize` to zero. + +The defaults should be suitable in many use cases. + ### Caveats When the tables are created, the read/write capacity is configured according to configuration. However, the service does not modify the capacity of existing tables. As a workaround, you can modify the read/write capacity of existing tables using the [Amazon console](https://aws.amazon.com/console/). @@ -128,4 +142,20 @@ When the tables are created, the read/write capacity is configured according to 7. Generate `.classpath` entries `ls lib/*.jar | python -c "import sys;pre=''; print('\\t' + pre + (post + '\\n\\t' + pre).join(map(str.strip, sys.stdin.readlines())) + post)"` -After these changes, it's good practice to run integration tests (against live AWS DynamoDB) in `org.openhab.persistence.dynamodb.test` bundle. See README.md in the test bundle for more information how to execute the tests. \ No newline at end of file +After these changes, it's good practice to run integration tests (against live AWS DynamoDB) in `org.openhab.persistence.dynamodb.test` bundle. See README.md in the test bundle for more information how to execute the tests. + +### Running integration tests + +To run integration tests, one needs to provide AWS credentials. + +Eclipse instructions +1. Run all tests (in package org.openhab.persistence.dynamodb.internal) as JUnit Tests +2. Configure the run configuration, and open Arguments sheet +3. In VM arguments, provide the credentials for AWS +```` +-DDYNAMODBTEST_REGION=REGION-ID +-DDYNAMODBTEST_ACCESS=ACCESS-KEY +-DDYNAMODBTEST_SECRET=SECRET +```` + +The tests will create tables with prefix `dynamodb-integration-tests-`. Note that when tests are begun, all data is removed from that table! diff --git a/bundles/org.openhab.persistence.dynamodb/src/main/java/org/openhab/persistence/dynamodb/internal/AbstractBufferedPersistenceService.java b/bundles/org.openhab.persistence.dynamodb/src/main/java/org/openhab/persistence/dynamodb/internal/AbstractBufferedPersistenceService.java new file mode 100644 index 0000000000000..bf9208559e2e8 --- /dev/null +++ b/bundles/org.openhab.persistence.dynamodb/src/main/java/org/openhab/persistence/dynamodb/internal/AbstractBufferedPersistenceService.java @@ -0,0 +1,103 @@ +package org.openhab.persistence.dynamodb.internal; + +import java.util.Date; +import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.openhab.core.items.Item; +import org.openhab.core.persistence.PersistenceService; +import org.openhab.core.types.State; +import org.openhab.core.types.UnDefType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AbstractBufferedPersistenceService implements PersistenceService { + + private static final long BUFFER_OFFER_TIMEOUT_MILLIS = 500; + + private final Logger logger = LoggerFactory.getLogger(AbstractBufferedPersistenceService.class); + protected BlockingQueue buffer; + + private boolean writeImmediately; + + protected void resetWithBufferSize(int bufferSize) { + int capacity = Math.max(1, bufferSize); + buffer = new ArrayBlockingQueue(capacity, true); + writeImmediately = bufferSize == 0; + } + + protected abstract T persistenceItemFromState(String name, State state, Date time); + + protected abstract boolean isReadyToStore(); + + protected abstract void flushBufferedData(); + + @Override + public void store(Item item) { + store(item, null); + } + + @Override + public void store(Item item, String alias) { + long storeStart = System.currentTimeMillis(); + String uuid = UUID.randomUUID().toString(); + if (item.getState() instanceof UnDefType) { + logger.debug("Undefined item state received. Not storing item {}.", item.getName()); + return; + } + if (!isReadyToStore()) { + return; + } + if (buffer == null) { + throw new IllegalStateException("Buffer not initialized with resetWithBufferSize. Bug?"); + } + Date time = new Date(storeStart); + String realName = item.getName(); + String name = (alias != null) ? alias : realName; + State state = item.getState(); + T persistenceItem = persistenceItemFromState(name, state, time); + logger.trace("store() called with item {}, which was converted to {} [{}]", item, persistenceItem, uuid); + if (writeImmediately) { + logger.debug("Writing immediately item {} [{}]", realName, uuid); + // We want to write everything immediately + // Synchronous behaviour to ensure buffer does not get full. + synchronized (this) { + boolean buffered = addToBuffer(persistenceItem); + assert buffered; + flushBufferedData(); + } + } else { + long bufferStart = System.currentTimeMillis(); + boolean buffered = addToBuffer(persistenceItem); + if (buffered) { + logger.debug("Buffered item {} in {} ms. Total time for store(): {} [{}]", realName, + System.currentTimeMillis() - bufferStart, System.currentTimeMillis() - storeStart, uuid); + } else { + logger.debug( + "Buffer is full. Writing buffered data immediately and trying again. Consider increasing bufferSize"); + // Buffer is full, commit it immediately + flushBufferedData(); + boolean buffered2 = addToBuffer(persistenceItem); + if (buffered2) { + logger.debug("Buffered item in {} ms (2nd try, flushed buffer in-between) [{}]", + System.currentTimeMillis() - bufferStart, uuid); + } else { + // The unlikely case happened -- buffer got full again immediately + logger.warn("Buffering failed for the second time -- Too small bufferSize? Discarding data [{}]", + uuid); + } + } + } + } + + protected boolean addToBuffer(T persistenceItem) { + try { + return buffer.offer(persistenceItem, BUFFER_OFFER_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + logger.warn("Interrupted when trying to buffer data! Dropping data"); + return false; + } + } +} diff --git a/bundles/org.openhab.persistence.dynamodb/src/main/java/org/openhab/persistence/dynamodb/internal/DynamoDBConfig.java b/bundles/org.openhab.persistence.dynamodb/src/main/java/org/openhab/persistence/dynamodb/internal/DynamoDBConfig.java index dd8b308a8a40f..e23776ab0c013 100644 --- a/bundles/org.openhab.persistence.dynamodb/src/main/java/org/openhab/persistence/dynamodb/internal/DynamoDBConfig.java +++ b/bundles/org.openhab.persistence.dynamodb/src/main/java/org/openhab/persistence/dynamodb/internal/DynamoDBConfig.java @@ -31,6 +31,8 @@ public class DynamoDBConfig { public static final boolean DEFAULT_CREATE_TABLE_ON_DEMAND = true; public static final long DEFAULT_READ_CAPACITY_UNITS = 1; public static final long DEFAULT_WRITE_CAPACITY_UNITS = 1; + public static final long DEFAULT_BUFFER_COMMIT_INTERVAL_MILLIS = 1000; + public static final int DEFAULT_BUFFER_SIZE = 1000; private static final Logger logger = LoggerFactory.getLogger(DynamoDBConfig.class); @@ -40,6 +42,8 @@ public class DynamoDBConfig { private boolean createTable = DEFAULT_CREATE_TABLE_ON_DEMAND; private long readCapacityUnits = DEFAULT_READ_CAPACITY_UNITS; private long writeCapacityUnits = DEFAULT_WRITE_CAPACITY_UNITS; + private long bufferCommitIntervalMillis = DEFAULT_BUFFER_COMMIT_INTERVAL_MILLIS; + private int bufferSize = DEFAULT_BUFFER_SIZE; /** * @@ -117,7 +121,26 @@ public static DynamoDBConfig fromConfig(Map config) { writeCapacityUnits = Long.parseLong(writeCapacityUnitsParam); } - return new DynamoDBConfig(region, credentials, table, createTable, readCapacityUnits, writeCapacityUnits); + final long bufferCommitIntervalMillis; + String bufferCommitIntervalMillisParam = (String) config.get("bufferCommitIntervalMillis"); + if (isBlank(bufferCommitIntervalMillisParam)) { + logger.debug("Buffer commit interval millis: {}", DEFAULT_BUFFER_COMMIT_INTERVAL_MILLIS); + bufferCommitIntervalMillis = DEFAULT_BUFFER_COMMIT_INTERVAL_MILLIS; + } else { + bufferCommitIntervalMillis = Long.parseLong(bufferCommitIntervalMillisParam); + } + + final int bufferSize; + String bufferSizeParam = (String) config.get("bufferSize"); + if (isBlank(bufferSizeParam)) { + logger.debug("Buffer size: {}", DEFAULT_BUFFER_SIZE); + bufferSize = DEFAULT_BUFFER_SIZE; + } else { + bufferSize = Integer.parseInt(bufferSizeParam); + } + + return new DynamoDBConfig(region, credentials, table, createTable, readCapacityUnits, writeCapacityUnits, + bufferCommitIntervalMillis, bufferSize); } catch (Exception e) { logger.error("Error with configuration", e); return null; @@ -125,13 +148,15 @@ public static DynamoDBConfig fromConfig(Map config) { } public DynamoDBConfig(Regions region, AWSCredentials credentials, String table, boolean createTable, - long readCapacityUnits, long writeCapacityUnits) { + long readCapacityUnits, long writeCapacityUnits, long bufferCommitIntervalMillis, int bufferSize) { this.region = region; this.credentials = credentials; this.tablePrefix = table; this.createTable = createTable; this.readCapacityUnits = readCapacityUnits; this.writeCapacityUnits = writeCapacityUnits; + this.bufferCommitIntervalMillis = bufferCommitIntervalMillis; + this.bufferSize = bufferSize; } public AWSCredentials getCredentials() { @@ -158,6 +183,14 @@ public long getWriteCapacityUnits() { return writeCapacityUnits; } + public long getBufferCommitIntervalMillis() { + return bufferCommitIntervalMillis; + } + + public int getBufferSize() { + return bufferSize; + } + private static void invalidRegionLogHelp(String region) { Regions[] regions = Regions.values(); String[] regionNames = new String[regions.length]; diff --git a/bundles/org.openhab.persistence.dynamodb/src/main/java/org/openhab/persistence/dynamodb/internal/DynamoDBPersistenceService.java b/bundles/org.openhab.persistence.dynamodb/src/main/java/org/openhab/persistence/dynamodb/internal/DynamoDBPersistenceService.java index e15e2fa372853..86d80a35ee78b 100644 --- a/bundles/org.openhab.persistence.dynamodb/src/main/java/org/openhab/persistence/dynamodb/internal/DynamoDBPersistenceService.java +++ b/bundles/org.openhab.persistence.dynamodb/src/main/java/org/openhab/persistence/dynamodb/internal/DynamoDBPersistenceService.java @@ -8,25 +8,31 @@ */ package org.openhab.persistence.dynamodb.internal; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collections; import java.util.Date; +import java.util.Deque; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import org.openhab.core.items.Item; import org.openhab.core.items.ItemNotFoundException; import org.openhab.core.items.ItemRegistry; -import org.openhab.core.library.types.OnOffType; -import org.openhab.core.library.types.OpenClosedType; import org.openhab.core.persistence.FilterCriteria; -import org.openhab.core.persistence.FilterCriteria.Operator; -import org.openhab.core.persistence.FilterCriteria.Ordering; import org.openhab.core.persistence.HistoricItem; import org.openhab.core.persistence.PersistenceService; import org.openhab.core.persistence.QueryablePersistenceService; import org.openhab.core.types.State; -import org.openhab.core.types.UnDefType; import org.osgi.framework.BundleContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,41 +40,115 @@ import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonServiceException; import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBMapper; +import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBMapper.FailedBatch; import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBMapperConfig; import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBMapperConfig.PaginationLoadingStrategy; import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBQueryExpression; import com.amazonaws.services.dynamodbv2.datamodeling.PaginatedQueryList; -import com.amazonaws.services.dynamodbv2.model.AttributeValue; -import com.amazonaws.services.dynamodbv2.model.ComparisonOperator; -import com.amazonaws.services.dynamodbv2.model.Condition; +import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome; import com.amazonaws.services.dynamodbv2.model.CreateTableRequest; import com.amazonaws.services.dynamodbv2.model.GlobalSecondaryIndex; import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput; import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException; import com.amazonaws.services.dynamodbv2.model.TableDescription; import com.amazonaws.services.dynamodbv2.model.TableStatus; -import com.google.common.collect.ImmutableMap; +import com.amazonaws.services.dynamodbv2.model.WriteRequest; /** * This is the implementation of the DynamoDB {@link PersistenceService}. It persists item values * using the Amazon DynamoDB database. The states ( - * {@link State}) of an {@link Item} are persisted in a time series with names equal to the name of - * the item. All values are stored using integers or doubles, {@link OnOffType} and - * {@link OpenClosedType} are stored using 0 or 1. + * {@link State}) of an {@link Item} are persisted in DynamoDB tables. * - * The default database name is "openhab" + * The service creates tables automatically, one for numbers, and one for strings. + * + * @see AbstractDynamoDBItem.fromState for details how different items are persisted * * @author Sami Salonen * */ -public class DynamoDBPersistenceService implements QueryablePersistenceService { +public class DynamoDBPersistenceService extends AbstractBufferedPersistenceService> + implements QueryablePersistenceService { + + private class ExponentialBackoffRetry implements Runnable { + private int retry; + private Map> unprocessedItems; + private Exception lastException; + + public ExponentialBackoffRetry(Map> unprocessedItems) { + this.unprocessedItems = unprocessedItems; + } + + @Override + public void run() { + logger.debug("Error storing object to dynamo, unprocessed items: {}. Retrying with exponential back-off", + unprocessedItems); + lastException = null; + while (!unprocessedItems.isEmpty() && retry < WAIT_MILLIS_IN_RETRIES.length) { + if (!sleep()) { + // Interrupted + return; + } + retry++; + try { + BatchWriteItemOutcome outcome = DynamoDBPersistenceService.this.db.getDynamoDB() + .batchWriteItemUnprocessed(unprocessedItems); + unprocessedItems = outcome.getUnprocessedItems(); + lastException = null; + } catch (AmazonServiceException e) { + if (e instanceof ResourceNotFoundException) { + logger.debug( + "DynamoDB query raised unexpected exception: {}. This might happen if table was recently created", + e.getMessage()); + } else { + logger.debug("DynamoDB query raised unexpected exception: {}.", e.getMessage()); + } + lastException = e; + continue; + } + } + if (unprocessedItems.isEmpty()) { + logger.debug("After {} retries successfully wrote all unprocessed items", retry); + } else { + logger.warn( + "Even after retries failed to write some items. Last exception: {} {}, unprocessed items: {}", + lastException == null ? "null" : lastException.getClass().getName(), + lastException == null ? "null" : lastException.getMessage(), unprocessedItems); + } + } + + private boolean sleep() { + try { + long sleepTime; + if (retry == 1 && lastException != null && lastException instanceof ResourceNotFoundException) { + sleepTime = WAIT_ON_FIRST_RESOURCE_NOT_FOUND_MILLIS; + } else { + sleepTime = WAIT_MILLIS_IN_RETRIES[retry]; + } + Thread.sleep(sleepTime); + return true; + } catch (InterruptedException e) { + logger.debug("Interrupted while writing data!"); + return false; + } + } + + public Map> getUnprocessedItems() { + return unprocessedItems; + } + } + + private static final int WAIT_ON_FIRST_RESOURCE_NOT_FOUND_MILLIS = 5000; + private static final int[] WAIT_MILLIS_IN_RETRIES = new int[] { 100, 100, 200, 300, 500 }; + private static final String DYNAMODB_THREADPOOL_NAME = "dynamodbPersistenceService"; private ItemRegistry itemRegistry; private DynamoDBClient db; - private static final Logger logger = LoggerFactory.getLogger(DynamoDBPersistenceService.class); + private final Logger logger = LoggerFactory.getLogger(DynamoDBPersistenceService.class); private boolean isProperlyConfigured; private DynamoDBConfig dbConfig; private DynamoDBTableNameResolver tableNameResolver; + private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1, new NamedThreadFactory()); + private ScheduledFuture writeBufferedDataFuture; /** * For testing. Allows access to underlying DynamoDBClient. @@ -98,37 +178,47 @@ public void activate(final BundleContext bundleContext, final Map 0) { + writeBufferedDataFuture = scheduler.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + DynamoDBPersistenceService.this.flushBufferedData(); + } + }, 0, commitIntervalMillis, TimeUnit.MILLISECONDS); + } isProperlyConfigured = true; logger.debug("dynamodb persistence service activated"); } public void deactivate() { logger.debug("dynamodb persistence service deactivated"); + if (writeBufferedDataFuture != null) { + writeBufferedDataFuture.cancel(false); + writeBufferedDataFuture = null; + } resetClient(); } /** - * Initializes DynamoDBClient (db field), if necessary, and checks the connection. + * Initializes DynamoDBClient (db field) * * If DynamoDBClient constructor throws an exception, error is logged and false is returned. * - * @return whether connection was successful. + * @return whether initialization was successful. */ - private boolean maybeConnectAndCheckConnection() { + private boolean ensureClient() { if (db == null) { try { db = new DynamoDBClient(dbConfig); @@ -137,7 +227,12 @@ private boolean maybeConnectAndCheckConnection() { return false; } } - return db.checkConnection(); + return true; + } + + @Override + public DynamoDBItem persistenceItemFromState(String name, State state, Date time) { + return AbstractDynamoDBItem.fromState(name, state, time); } /** @@ -242,54 +337,112 @@ private DynamoDBMapper getDBMapper(String tableName) { } @Override - public String getName() { - return "dynamodb"; + protected boolean isReadyToStore() { + return isProperlyConfigured && ensureClient(); } @Override - public void store(Item item) { - store(item, null); + public String getName() { + return "dynamodb"; } - /** - * {@inheritDoc} - */ @Override - public void store(Item item, String alias) { - if (item.getState() instanceof UnDefType) { - logger.debug("Undefined item state received. Not storing item."); + protected void flushBufferedData() { + if (buffer.isEmpty()) { return; } - if (!isProperlyConfigured) { - logger.warn("Configuration for dynamodb not yet loaded or broken. Not storing item."); - return; - } - if (!maybeConnectAndCheckConnection()) { - logger.warn("DynamoDB not connected. Not storing item."); - return; + logger.debug("Writing buffered data. Buffer size: {}", buffer.size()); + + for (;;) { + Map>> itemsByTable = readBuffer(); + // Write batch of data, one table at a time + for (Entry>> entry : itemsByTable.entrySet()) { + String tableName = entry.getKey(); + Deque> batch = entry.getValue(); + if (!batch.isEmpty()) { + flushBatch(getDBMapper(tableName), batch); + } + } + if (buffer.isEmpty()) { + break; + } } - String realName = item.getName(); - String name = (alias != null) ? alias : realName; - Date time = new Date(System.currentTimeMillis()); - - State state = item.getState(); - logger.trace("Tried to get item from item class {}, state is {}", item.getClass(), state.toString()); - DynamoDBItem dynamoItem = AbstractDynamoDBItem.fromState(name, state, time); - DynamoDBMapper mapper = getDBMapper(tableNameResolver.fromItem(dynamoItem)); + } - if (!createTable(mapper, dynamoItem.getClass())) { - logger.warn("Table creation failed. Not storing item"); - return; + private Map>> readBuffer() { + Map>> batchesByTable = new HashMap>>(2); + // Get batch of data + while (!buffer.isEmpty()) { + DynamoDBItem dynamoItem = buffer.poll(); + if (dynamoItem == null) { + break; + } + String tableName = tableNameResolver.fromItem(dynamoItem); + Deque> batch = batchesByTable.computeIfAbsent(tableName, + new Function>>() { + @Override + public Deque> apply(String t) { + return new ArrayDeque>(); + } + }); + batch.add(dynamoItem); } + return batchesByTable; + } - try { - logger.debug("storing {} in dynamo. Serialized value {}. Original Item: {}", name, state, item); - mapper.save(dynamoItem); - logger.debug("Sucessfully stored item {}", item); - } catch (AmazonClientException e) { - logger.error("Error storing object to dynamo: {}", e.getMessage()); + /** + * Flush batch of data to DynamoDB + * + * @param mapper mapper associated with the batch + * @param batch batch of data to write to DynamoDB + */ + private void flushBatch(DynamoDBMapper mapper, Deque> batch) { + long currentTimeMillis = System.currentTimeMillis(); + List failed = mapper.batchSave(batch); + for (FailedBatch failedBatch : failed) { + if (failedBatch.getException() instanceof ResourceNotFoundException) { + // Table did not exist. Try writing everything again. + retryFlushAfterCreatingTable(mapper, batch, failedBatch); + break; + } else { + logger.debug("Batch failed with {}. Retrying next with exponential back-off", + failedBatch.getException().getMessage()); + new ExponentialBackoffRetry(failedBatch.getUnprocessedItems()).run(); + } + } + if (failed.isEmpty()) { + logger.debug("flushBatch ended with {} items in {} ms: {}", batch.size(), + System.currentTimeMillis() - currentTimeMillis, batch); + } else { + logger.warn( + "flushBatch ended with {} items in {} ms: {}. There were some failed batches that were retried -- check logs for ERRORs to see if writes were successful", + batch.size(), System.currentTimeMillis() - currentTimeMillis, batch); } + } + /** + * Retry flushing data after creating table associated with mapper + * + * @param mapper mapper associated with the batch + * @param batch original batch of data. Used for logging and to determine table name + * @param failedBatch failed batch that should be retried + */ + private void retryFlushAfterCreatingTable(DynamoDBMapper mapper, Deque> batch, + FailedBatch failedBatch) { + logger.debug("Table was not found. Trying to create table and try saving again"); + if (createTable(mapper, batch.peek().getClass())) { + logger.debug("Table creation successful, trying to save again"); + if (!failedBatch.getUnprocessedItems().isEmpty()) { + ExponentialBackoffRetry retry = new ExponentialBackoffRetry(failedBatch.getUnprocessedItems()); + retry.run(); + if (retry.getUnprocessedItems().isEmpty()) { + logger.debug("Successfully saved items after table creation"); + } + } + } else { + logger.warn("Table creation failed. Not storing some parts of batch: {}. Unprocessed items: {}", batch, + failedBatch.getUnprocessedItems()); + } } /** @@ -302,7 +455,7 @@ public Iterable query(FilterCriteria filter) { logger.warn("Configuration for dynamodb not yet loaded or broken. Not storing item."); return Collections.emptyList(); } - if (!maybeConnectAndCheckConnection()) { + if (!ensureClient()) { logger.warn("DynamoDB not connected. Not storing item."); return Collections.emptyList(); } @@ -322,7 +475,8 @@ public Iterable query(FilterCriteria filter) { List historicItems = new ArrayList(); - DynamoDBQueryExpression> queryExpression = createQueryExpression(dtoClass, filter); + DynamoDBQueryExpression> queryExpression = DynamoDBQueryUtils.createQueryExpression(dtoClass, + filter); @SuppressWarnings("rawtypes") final PaginatedQueryList paginatedList; try { @@ -353,121 +507,6 @@ public Iterable query(FilterCriteria filter) { return historicItems; } - /** - * Construct dynamodb query from filter - * - * @param filter - * @return DynamoDBQueryExpression corresponding to the given FilterCriteria - */ - private DynamoDBQueryExpression> createQueryExpression(Class> dtoClass, - FilterCriteria filter) { - DynamoDBItem item = getDynamoDBHashKey(dtoClass, filter.getItemName()); - final DynamoDBQueryExpression> queryExpression = new DynamoDBQueryExpression>() - .withHashKeyValues(item).withScanIndexForward(filter.getOrdering() == Ordering.ASCENDING) - .withLimit(filter.getPageSize()); - Condition timeFilter = maybeAddTimeFilter(queryExpression, filter); - maybeAddStateFilter(filter, queryExpression); - logger.debug("Querying: {} with {}", filter.getItemName(), timeFilter); - return queryExpression; - } - - private DynamoDBItem getDynamoDBHashKey(Class> dtoClass, String itemName) { - DynamoDBItem item; - try { - item = dtoClass.newInstance(); - } catch (InstantiationException e) { - throw new RuntimeException(e); - } catch (IllegalAccessException e) { - throw new RuntimeException(e); - } - item.setName(itemName); - return item; - } - - private void maybeAddStateFilter(FilterCriteria filter, - final DynamoDBQueryExpression> queryExpression) { - if (filter.getOperator() != null && filter.getState() != null) { - // Convert filter's state to DynamoDBItem in order get suitable string representation for the state - final DynamoDBItem filterState = AbstractDynamoDBItem.fromState(filter.getItemName(), filter.getState(), - new Date()); - queryExpression.setFilterExpression(String.format("%s %s :opstate", DynamoDBItem.ATTRIBUTE_NAME_ITEMSTATE, - operatorAsString(filter.getOperator()))); - - filterState.accept(new DynamoDBItemVisitor() { - - @Override - public void visit(DynamoDBStringItem dynamoStringItem) { - queryExpression.setExpressionAttributeValues( - ImmutableMap.of(":opstate", new AttributeValue().withS(dynamoStringItem.getState()))); - } - - @Override - public void visit(DynamoDBBigDecimalItem dynamoBigDecimalItem) { - queryExpression.setExpressionAttributeValues(ImmutableMap.of(":opstate", - new AttributeValue().withN(dynamoBigDecimalItem.getState().toPlainString()))); - } - }); - - } - } - - private Condition maybeAddTimeFilter(final DynamoDBQueryExpression> queryExpression, - final FilterCriteria filter) { - final Condition timeCondition = constructTimeCondition(filter); - if (timeCondition != null) { - queryExpression.setRangeKeyConditions( - Collections.singletonMap(DynamoDBItem.ATTRIBUTE_NAME_TIMEUTC, timeCondition)); - } - return timeCondition; - } - - private Condition constructTimeCondition(FilterCriteria filter) { - boolean hasBegin = filter.getBeginDate() != null; - boolean hasEnd = filter.getEndDate() != null; - - final Condition timeCondition; - if (!hasBegin && !hasEnd) { - timeCondition = null; - } else if (!hasBegin && hasEnd) { - timeCondition = new Condition().withComparisonOperator(ComparisonOperator.LE).withAttributeValueList( - new AttributeValue().withS(AbstractDynamoDBItem.DATEFORMATTER.format(filter.getEndDate()))); - } else if (hasBegin && !hasEnd) { - timeCondition = new Condition().withComparisonOperator(ComparisonOperator.GE).withAttributeValueList( - new AttributeValue().withS(AbstractDynamoDBItem.DATEFORMATTER.format(filter.getBeginDate()))); - } else { - timeCondition = new Condition().withComparisonOperator(ComparisonOperator.BETWEEN).withAttributeValueList( - new AttributeValue().withS(AbstractDynamoDBItem.DATEFORMATTER.format(filter.getBeginDate())), - new AttributeValue().withS(AbstractDynamoDBItem.DATEFORMATTER.format(filter.getEndDate()))); - } - return timeCondition; - } - - /** - * Convert op to string suitable for dynamodb filter expression - * - * @param op - * @return string representation corresponding to the given the Operator - */ - private static String operatorAsString(Operator op) { - switch (op) { - case EQ: - return "="; - case NEQ: - return "<>"; - case LT: - return "<"; - case LTE: - return "<="; - case GT: - return ">"; - case GTE: - return ">="; - - default: - throw new IllegalStateException("Unknown operator " + op); - } - } - /** * Retrieves the item for the given name from the item registry * @@ -486,4 +525,36 @@ private Item getItemFromRegistry(String itemName) { return item; } + /** + * This is a normal thread factory, which adds a named prefix to all created + * threads. + * + * Adapted from RRD4jService + */ + private static class NamedThreadFactory implements ThreadFactory { + + protected final ThreadGroup group; + protected final AtomicInteger threadNumber = new AtomicInteger(1); + protected final String namePrefix; + + public NamedThreadFactory() { + this.namePrefix = DYNAMODB_THREADPOOL_NAME; + SecurityManager s = System.getSecurityManager(); + group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); + } + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); + if (!t.isDaemon()) { + t.setDaemon(true); + } + if (t.getPriority() != Thread.NORM_PRIORITY) { + t.setPriority(Thread.NORM_PRIORITY); + } + + return t; + } + } + } diff --git a/bundles/org.openhab.persistence.dynamodb/src/main/java/org/openhab/persistence/dynamodb/internal/DynamoDBQueryUtils.java b/bundles/org.openhab.persistence.dynamodb/src/main/java/org/openhab/persistence/dynamodb/internal/DynamoDBQueryUtils.java new file mode 100644 index 0000000000000..a646ab74b5f47 --- /dev/null +++ b/bundles/org.openhab.persistence.dynamodb/src/main/java/org/openhab/persistence/dynamodb/internal/DynamoDBQueryUtils.java @@ -0,0 +1,129 @@ +package org.openhab.persistence.dynamodb.internal; + +import java.util.Collections; +import java.util.Date; + +import org.openhab.core.persistence.FilterCriteria; +import org.openhab.core.persistence.FilterCriteria.Operator; +import org.openhab.core.persistence.FilterCriteria.Ordering; + +import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBQueryExpression; +import com.amazonaws.services.dynamodbv2.model.AttributeValue; +import com.amazonaws.services.dynamodbv2.model.ComparisonOperator; +import com.amazonaws.services.dynamodbv2.model.Condition; + +public class DynamoDBQueryUtils { + /** + * Construct dynamodb query from filter + * + * @param filter + * @return DynamoDBQueryExpression corresponding to the given FilterCriteria + */ + public static DynamoDBQueryExpression> createQueryExpression( + Class> dtoClass, FilterCriteria filter) { + DynamoDBItem item = getDynamoDBHashKey(dtoClass, filter.getItemName()); + final DynamoDBQueryExpression> queryExpression = new DynamoDBQueryExpression>() + .withHashKeyValues(item).withScanIndexForward(filter.getOrdering() == Ordering.ASCENDING) + .withLimit(filter.getPageSize()); + maybeAddTimeFilter(queryExpression, filter); + maybeAddStateFilter(filter, queryExpression); + return queryExpression; + } + + private static DynamoDBItem getDynamoDBHashKey(Class> dtoClass, String itemName) { + DynamoDBItem item; + try { + item = dtoClass.newInstance(); + } catch (InstantiationException e) { + throw new RuntimeException(e); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + item.setName(itemName); + return item; + } + + private static void maybeAddStateFilter(FilterCriteria filter, + final DynamoDBQueryExpression> queryExpression) { + if (filter.getOperator() != null && filter.getState() != null) { + // Convert filter's state to DynamoDBItem in order get suitable string representation for the state + final DynamoDBItem filterState = AbstractDynamoDBItem.fromState(filter.getItemName(), filter.getState(), + new Date()); + queryExpression.setFilterExpression(String.format("%s %s :opstate", DynamoDBItem.ATTRIBUTE_NAME_ITEMSTATE, + operatorAsString(filter.getOperator()))); + + filterState.accept(new DynamoDBItemVisitor() { + + @Override + public void visit(DynamoDBStringItem dynamoStringItem) { + queryExpression.setExpressionAttributeValues(Collections.singletonMap(":opstate", + new AttributeValue().withS(dynamoStringItem.getState()))); + } + + @Override + public void visit(DynamoDBBigDecimalItem dynamoBigDecimalItem) { + queryExpression.setExpressionAttributeValues(Collections.singletonMap(":opstate", + new AttributeValue().withN(dynamoBigDecimalItem.getState().toPlainString()))); + } + }); + + } + } + + private static Condition maybeAddTimeFilter(final DynamoDBQueryExpression> queryExpression, + final FilterCriteria filter) { + final Condition timeCondition = constructTimeCondition(filter); + if (timeCondition != null) { + queryExpression.setRangeKeyConditions( + Collections.singletonMap(DynamoDBItem.ATTRIBUTE_NAME_TIMEUTC, timeCondition)); + } + return timeCondition; + } + + private static Condition constructTimeCondition(FilterCriteria filter) { + boolean hasBegin = filter.getBeginDate() != null; + boolean hasEnd = filter.getEndDate() != null; + + final Condition timeCondition; + if (!hasBegin && !hasEnd) { + timeCondition = null; + } else if (!hasBegin && hasEnd) { + timeCondition = new Condition().withComparisonOperator(ComparisonOperator.LE).withAttributeValueList( + new AttributeValue().withS(AbstractDynamoDBItem.DATEFORMATTER.format(filter.getEndDate()))); + } else if (hasBegin && !hasEnd) { + timeCondition = new Condition().withComparisonOperator(ComparisonOperator.GE).withAttributeValueList( + new AttributeValue().withS(AbstractDynamoDBItem.DATEFORMATTER.format(filter.getBeginDate()))); + } else { + timeCondition = new Condition().withComparisonOperator(ComparisonOperator.BETWEEN).withAttributeValueList( + new AttributeValue().withS(AbstractDynamoDBItem.DATEFORMATTER.format(filter.getBeginDate())), + new AttributeValue().withS(AbstractDynamoDBItem.DATEFORMATTER.format(filter.getEndDate()))); + } + return timeCondition; + } + + /** + * Convert op to string suitable for dynamodb filter expression + * + * @param op + * @return string representation corresponding to the given the Operator + */ + private static String operatorAsString(Operator op) { + switch (op) { + case EQ: + return "="; + case NEQ: + return "<>"; + case LT: + return "<"; + case LTE: + return "<="; + case GT: + return ">"; + case GTE: + return ">="; + + default: + throw new IllegalStateException("Unknown operator " + op); + } + } +} diff --git a/bundles/org.openhab.persistence.dynamodb/src/test/java/org/openhab/persistence/dynamodb/internal/AbstractDynamoDBItemSerializationTest.java b/bundles/org.openhab.persistence.dynamodb/src/test/java/org/openhab/persistence/dynamodb/internal/AbstractDynamoDBItemSerializationTest.java index 4c26db856f35f..f2507624ac2d9 100644 --- a/bundles/org.openhab.persistence.dynamodb/src/test/java/org/openhab/persistence/dynamodb/internal/AbstractDynamoDBItemSerializationTest.java +++ b/bundles/org.openhab.persistence.dynamodb/src/test/java/org/openhab/persistence/dynamodb/internal/AbstractDynamoDBItemSerializationTest.java @@ -56,22 +56,25 @@ public class AbstractDynamoDBItemSerializationTest { * Generic function testing serialization of item state to internal format in DB. In other words, conversion of * Item with state to DynamoDBItem * - * @param state item state + * @param state item state * @param expectedState internal format in DB representing the item state * @return dynamo db item * @throws IOException */ - @SuppressWarnings("unchecked") public DynamoDBItem testStateGeneric(State state, Object expectedState) throws IOException { DynamoDBItem dbItem = AbstractDynamoDBItem.fromState("item1", state, date); assertEquals("item1", dbItem.getName()); assertEquals(date, dbItem.getTime()); + Object actualState = dbItem.getState(); if (expectedState instanceof BigDecimal) { - assertTrue(DynamoDBBigDecimalItem.loseDigits(((BigDecimal) expectedState)) - .compareTo((((DynamoDBItem) dbItem).getState())) == 0); + BigDecimal expectedRounded = DynamoDBBigDecimalItem.loseDigits(((BigDecimal) expectedState)); + assertTrue( + String.format("Expected state %s (%s but with some digits lost) did not match actual state %s", + expectedRounded, expectedState, actualState), + expectedRounded.compareTo((BigDecimal) actualState) == 0); } else { - assertEquals(expectedState, dbItem.getState()); + assertEquals(expectedState, actualState); } return dbItem; } @@ -79,8 +82,8 @@ public DynamoDBItem testStateGeneric(State state, Object expectedState) throw /** * Test state deserialization, that is DynamoDBItem conversion to HistoricItem * - * @param dbItem dynamo db item - * @param item parameter for DynamoDBItem.asHistoricItem + * @param dbItem dynamo db item + * @param item parameter for DynamoDBItem.asHistoricItem * @param expectedState Expected state of the historic item. DecimalTypes are compared with reduced accuracy * @return * @throws IOException @@ -94,8 +97,11 @@ public HistoricItem testAsHistoricGeneric(DynamoDBItem dbItem, Item item, Obj assertEquals(expectedState.getClass(), historicItem.getState().getClass()); if (expectedState instanceof DecimalType) { // serialization loses accuracy, take this into consideration - assertTrue(DynamoDBBigDecimalItem.loseDigits(((DecimalType) expectedState).toBigDecimal()) - .compareTo(((DecimalType) historicItem.getState()).toBigDecimal()) == 0); + BigDecimal expectedRounded = DynamoDBBigDecimalItem + .loseDigits(((DecimalType) expectedState).toBigDecimal()); + BigDecimal actual = ((DecimalType) historicItem.getState()).toBigDecimal(); + assertTrue(String.format("Expected state %s (%s but with some digits lost) did not match actual state %s", + expectedRounded, expectedState, actual), expectedRounded.compareTo(actual) == 0); } else if (expectedState instanceof CallType) { // CallType has buggy equals, let's compare strings instead assertEquals(expectedState.toString(), historicItem.getState().toString()); @@ -181,33 +187,33 @@ public void testPointTypeWithLocationItem() throws IOException { @Test public void testDecimalTypeWithNumberItem() throws IOException { - DynamoDBItem dbitem = testStateGeneric(new DecimalType(3.2), new BigDecimal(3.2)); - testAsHistoricGeneric(dbitem, new NumberItem("foo"), new DecimalType(3.2)); + DynamoDBItem dbitem = testStateGeneric(new DecimalType("3.2"), new BigDecimal("3.2")); + testAsHistoricGeneric(dbitem, new NumberItem("foo"), new DecimalType("3.2")); } @Test public void testPercentTypeWithColorItem() throws IOException { - DynamoDBItem dbitem = testStateGeneric(new PercentType(new BigDecimal(3.2)), new BigDecimal(3.2)); - testAsHistoricGeneric(dbitem, new ColorItem("foo"), new PercentType(new BigDecimal(3.2))); + DynamoDBItem dbitem = testStateGeneric(new PercentType(new BigDecimal("3.2")), new BigDecimal("3.2")); + testAsHistoricGeneric(dbitem, new ColorItem("foo"), new PercentType(new BigDecimal("3.2"))); } @Test public void testPercentTypeWithDimmerItem() throws IOException { - DynamoDBItem dbitem = testStateGeneric(new PercentType(new BigDecimal(3.2)), new BigDecimal(3.2)); - testAsHistoricGeneric(dbitem, new DimmerItem("foo"), new PercentType(new BigDecimal(3.2))); + DynamoDBItem dbitem = testStateGeneric(new PercentType(new BigDecimal("3.2")), new BigDecimal("3.2")); + testAsHistoricGeneric(dbitem, new DimmerItem("foo"), new PercentType(new BigDecimal("3.2"))); } @Test public void testPercentTypeWithRollerShutterItem() throws IOException { - DynamoDBItem dbitem = testStateGeneric(new PercentType(new BigDecimal(3.2)), new BigDecimal(3.2)); - testAsHistoricGeneric(dbitem, new RollershutterItem("foo"), new PercentType(new BigDecimal(3.2))); + DynamoDBItem dbitem = testStateGeneric(new PercentType(new BigDecimal("3.2")), new BigDecimal("3.2")); + testAsHistoricGeneric(dbitem, new RollershutterItem("foo"), new PercentType(new BigDecimal("3.2"))); } @Test public void testPercentTypeWithNumberItem() throws IOException { - DynamoDBItem dbitem = testStateGeneric(new PercentType(new BigDecimal(3.2)), new BigDecimal(3.2)); + DynamoDBItem dbitem = testStateGeneric(new PercentType(new BigDecimal("3.2")), new BigDecimal("3.2")); // note: comes back as DecimalType instead of the original PercentType - testAsHistoricGeneric(dbitem, new NumberItem("foo"), new DecimalType(new BigDecimal(3.2))); + testAsHistoricGeneric(dbitem, new NumberItem("foo"), new DecimalType(new BigDecimal("3.2"))); } @Test diff --git a/bundles/org.openhab.persistence.dynamodb/src/test/java/org/openhab/persistence/dynamodb/internal/BaseIntegrationTest.java b/bundles/org.openhab.persistence.dynamodb/src/test/java/org/openhab/persistence/dynamodb/internal/BaseIntegrationTest.java index e456a448a2670..a6787e535bbbc 100644 --- a/bundles/org.openhab.persistence.dynamodb/src/test/java/org/openhab/persistence/dynamodb/internal/BaseIntegrationTest.java +++ b/bundles/org.openhab.persistence.dynamodb/src/test/java/org/openhab/persistence/dynamodb/internal/BaseIntegrationTest.java @@ -36,7 +36,7 @@ import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException; /** - * + * * @author Sami Salonen * */ @@ -112,6 +112,9 @@ public void addItemRegistryChangeListener(ItemRegistryChangeListener listener) { config.put("secretKey", System.getProperty("DYNAMODBTEST_SECRET")); config.put("tablePrefix", "dynamodb-integration-tests-"); + // Disable buffering + config.put("bufferSize", "0"); + for (Entry entry : config.entrySet()) { if (entry.getValue() == null) { logger.warn(String.format( diff --git a/bundles/org.openhab.persistence.dynamodb/src/test/java/org/openhab/persistence/dynamodb/internal/DynamoDBConfigTest.java b/bundles/org.openhab.persistence.dynamodb/src/test/java/org/openhab/persistence/dynamodb/internal/DynamoDBConfigTest.java index dec85d35370bf..189da8cb5cc63 100644 --- a/bundles/org.openhab.persistence.dynamodb/src/test/java/org/openhab/persistence/dynamodb/internal/DynamoDBConfigTest.java +++ b/bundles/org.openhab.persistence.dynamodb/src/test/java/org/openhab/persistence/dynamodb/internal/DynamoDBConfigTest.java @@ -11,7 +11,9 @@ import static org.junit.Assert.*; import java.io.File; +import java.util.Collections; import java.util.HashMap; +import java.util.Map; import org.apache.commons.io.FileUtils; import org.junit.Rule; @@ -19,7 +21,6 @@ import org.junit.rules.TemporaryFolder; import com.amazonaws.regions.Regions; -import com.google.common.collect.ImmutableMap; /** * @@ -28,6 +29,17 @@ */ public class DynamoDBConfigTest { + private static Map mapFrom(String... args) { + assert args.length % 2 == 0; + Map config = new HashMap<>(); + for (int i = 1; i < args.length; i++) { + String key = args[i - 1]; + String val = args[i]; + config.put(key, val); + } + return Collections.unmodifiableMap(config); + } + @Rule public TemporaryFolder folder = new TemporaryFolder(); @@ -38,18 +50,18 @@ public void testEmpty() throws Exception { @Test public void testInvalidRegion() throws Exception { - assertNull(DynamoDBConfig.fromConfig(ImmutableMap. of("region", "foobie"))); + assertNull(DynamoDBConfig.fromConfig(Collections.singletonMap("region", "foobie"))); } @Test public void testRegionOnly() throws Exception { - assertNull(DynamoDBConfig.fromConfig(ImmutableMap. of("region", "eu-west-1"))); + assertNull(DynamoDBConfig.fromConfig(Collections.singletonMap("region", "eu-west-1"))); } @Test public void testRegionWithAccessKeys() throws Exception { - DynamoDBConfig fromConfig = DynamoDBConfig.fromConfig(ImmutableMap. of("region", "eu-west-1", - "accessKey", "access1", "secretKey", "secret1")); + DynamoDBConfig fromConfig = DynamoDBConfig + .fromConfig(mapFrom("region", "eu-west-1", "accessKey", "access1", "secretKey", "secret1")); assertEquals(Regions.EU_WEST_1, fromConfig.getRegion()); assertEquals("access1", fromConfig.getCredentials().getAWSAccessKeyId()); assertEquals("secret1", fromConfig.getCredentials().getAWSSecretKey()); @@ -57,6 +69,8 @@ public void testRegionWithAccessKeys() throws Exception { assertEquals(true, fromConfig.isCreateTable()); assertEquals(1, fromConfig.getReadCapacityUnits()); assertEquals(1, fromConfig.getWriteCapacityUnits()); + assertEquals(1000L, fromConfig.getBufferCommitIntervalMillis()); + assertEquals(1000, fromConfig.getBufferSize()); } @Test @@ -65,13 +79,15 @@ public void testRegionWithProfilesConfigFile() throws Exception { FileUtils.write(credsFile, "[fooprofile]\n" + "aws_access_key_id=testAccessKey\n" + "aws_secret_access_key=testSecretKey\n" + "aws_session_token=testSessionToken\n"); - DynamoDBConfig fromConfig = DynamoDBConfig.fromConfig(ImmutableMap. of("region", "eu-west-1", - "profilesConfigFile", credsFile.getAbsolutePath(), "profile", "fooprofile")); + DynamoDBConfig fromConfig = DynamoDBConfig.fromConfig(mapFrom("region", "eu-west-1", "profilesConfigFile", + credsFile.getAbsolutePath(), "profile", "fooprofile")); assertEquals(Regions.EU_WEST_1, fromConfig.getRegion()); assertEquals("openhab-", fromConfig.getTablePrefix()); assertEquals(true, fromConfig.isCreateTable()); assertEquals(1, fromConfig.getReadCapacityUnits()); assertEquals(1, fromConfig.getWriteCapacityUnits()); + assertEquals(1000L, fromConfig.getBufferCommitIntervalMillis()); + assertEquals(1000, fromConfig.getBufferSize()); } @Test @@ -81,7 +97,7 @@ public void testNullConfiguration() throws Exception { @Test public void testEmptyConfiguration() throws Exception { - assertNull(DynamoDBConfig.fromConfig(ImmutableMap. of())); + assertNull(DynamoDBConfig.fromConfig(mapFrom())); } @Test @@ -90,8 +106,8 @@ public void testRegionWithInvalidProfilesConfigFile() throws Exception { FileUtils.write(credsFile, "[fooprofile]\n" + "aws_access_key_idINVALIDKEY=testAccessKey\n" + "aws_secret_access_key=testSecretKey\n" + "aws_session_token=testSessionToken\n"); - assertNull(DynamoDBConfig.fromConfig(ImmutableMap. of("region", "eu-west-1", - "profilesConfigFile", credsFile.getAbsolutePath(), "profile", "fooprofile"))); + assertNull(DynamoDBConfig.fromConfig(mapFrom("region", "eu-west-1", "profilesConfigFile", + credsFile.getAbsolutePath(), "profile", "fooprofile"))); } @Test @@ -100,14 +116,14 @@ public void testRegionWithProfilesConfigFileMissingProfile() throws Exception { FileUtils.write(credsFile, "[fooprofile]\n" + "aws_access_key_id=testAccessKey\n" + "aws_secret_access_key=testSecretKey\n" + "aws_session_token=testSessionToken\n"); - assertNull(DynamoDBConfig.fromConfig(ImmutableMap. of("region", "eu-west-1", - "profilesConfigFile", credsFile.getAbsolutePath()))); + assertNull(DynamoDBConfig + .fromConfig(mapFrom("region", "eu-west-1", "profilesConfigFile", credsFile.getAbsolutePath()))); } @Test public void testRegionWithAccessKeysWithPrefix() throws Exception { - DynamoDBConfig fromConfig = DynamoDBConfig.fromConfig(ImmutableMap. of("region", "eu-west-1", - "accessKey", "access1", "secretKey", "secret1", "tablePrefix", "foobie-")); + DynamoDBConfig fromConfig = DynamoDBConfig.fromConfig(mapFrom("region", "eu-west-1", "accessKey", "access1", + "secretKey", "secret1", "tablePrefix", "foobie-")); assertEquals(Regions.EU_WEST_1, fromConfig.getRegion()); assertEquals("access1", fromConfig.getCredentials().getAWSAccessKeyId()); assertEquals("secret1", fromConfig.getCredentials().getAWSSecretKey()); @@ -115,12 +131,14 @@ public void testRegionWithAccessKeysWithPrefix() throws Exception { assertEquals(true, fromConfig.isCreateTable()); assertEquals(1, fromConfig.getReadCapacityUnits()); assertEquals(1, fromConfig.getWriteCapacityUnits()); + assertEquals(1000L, fromConfig.getBufferCommitIntervalMillis()); + assertEquals(1000, fromConfig.getBufferSize()); } @Test public void testRegionWithAccessKeysWithPrefixWithCreateTable() throws Exception { - DynamoDBConfig fromConfig = DynamoDBConfig.fromConfig(ImmutableMap. of("region", "eu-west-1", - "accessKey", "access1", "secretKey", "secret1", "createTable", "false")); + DynamoDBConfig fromConfig = DynamoDBConfig.fromConfig( + mapFrom("region", "eu-west-1", "accessKey", "access1", "secretKey", "secret1", "createTable", "false")); assertEquals(Regions.EU_WEST_1, fromConfig.getRegion()); assertEquals("access1", fromConfig.getCredentials().getAWSAccessKeyId()); assertEquals("secret1", fromConfig.getCredentials().getAWSSecretKey()); @@ -128,12 +146,14 @@ public void testRegionWithAccessKeysWithPrefixWithCreateTable() throws Exception assertEquals(false, fromConfig.isCreateTable()); assertEquals(1, fromConfig.getReadCapacityUnits()); assertEquals(1, fromConfig.getWriteCapacityUnits()); + assertEquals(1000L, fromConfig.getBufferCommitIntervalMillis()); + assertEquals(1000, fromConfig.getBufferSize()); } @Test public void testRegionWithAccessKeysWithPrefixWithReadCapacityUnits() throws Exception { - DynamoDBConfig fromConfig = DynamoDBConfig.fromConfig(ImmutableMap. of("region", "eu-west-1", - "accessKey", "access1", "secretKey", "secret1", "readCapacityUnits", "5")); + DynamoDBConfig fromConfig = DynamoDBConfig.fromConfig(mapFrom("region", "eu-west-1", "accessKey", "access1", + "secretKey", "secret1", "readCapacityUnits", "5")); assertEquals(Regions.EU_WEST_1, fromConfig.getRegion()); assertEquals("access1", fromConfig.getCredentials().getAWSAccessKeyId()); assertEquals("secret1", fromConfig.getCredentials().getAWSSecretKey()); @@ -141,12 +161,14 @@ public void testRegionWithAccessKeysWithPrefixWithReadCapacityUnits() throws Exc assertEquals(true, fromConfig.isCreateTable()); assertEquals(5, fromConfig.getReadCapacityUnits()); assertEquals(1, fromConfig.getWriteCapacityUnits()); + assertEquals(1000L, fromConfig.getBufferCommitIntervalMillis()); + assertEquals(1000, fromConfig.getBufferSize()); } @Test public void testRegionWithAccessKeysWithPrefixWithWriteCapacityUnits() throws Exception { - DynamoDBConfig fromConfig = DynamoDBConfig.fromConfig(ImmutableMap. of("region", "eu-west-1", - "accessKey", "access1", "secretKey", "secret1", "writeCapacityUnits", "5")); + DynamoDBConfig fromConfig = DynamoDBConfig.fromConfig(mapFrom("region", "eu-west-1", "accessKey", "access1", + "secretKey", "secret1", "writeCapacityUnits", "5")); assertEquals(Regions.EU_WEST_1, fromConfig.getRegion()); assertEquals("access1", fromConfig.getCredentials().getAWSAccessKeyId()); assertEquals("secret1", fromConfig.getCredentials().getAWSSecretKey()); @@ -154,12 +176,30 @@ public void testRegionWithAccessKeysWithPrefixWithWriteCapacityUnits() throws Ex assertEquals(true, fromConfig.isCreateTable()); assertEquals(1, fromConfig.getReadCapacityUnits()); assertEquals(5, fromConfig.getWriteCapacityUnits()); + assertEquals(1000L, fromConfig.getBufferCommitIntervalMillis()); + assertEquals(1000, fromConfig.getBufferSize()); } @Test public void testRegionWithAccessKeysWithPrefixWithReadWriteCapacityUnits() throws Exception { - DynamoDBConfig fromConfig = DynamoDBConfig.fromConfig(ImmutableMap. of("region", "eu-west-1", - "accessKey", "access1", "secretKey", "secret1", "readCapacityUnits", "3", "writeCapacityUnits", "5")); + DynamoDBConfig fromConfig = DynamoDBConfig.fromConfig(mapFrom("region", "eu-west-1", "accessKey", "access1", + "secretKey", "secret1", "readCapacityUnits", "3", "writeCapacityUnits", "5")); + assertEquals(Regions.EU_WEST_1, fromConfig.getRegion()); + assertEquals("access1", fromConfig.getCredentials().getAWSAccessKeyId()); + assertEquals("secret1", fromConfig.getCredentials().getAWSSecretKey()); + assertEquals("openhab-", fromConfig.getTablePrefix()); + assertEquals(true, fromConfig.isCreateTable()); + assertEquals(3, fromConfig.getReadCapacityUnits()); + assertEquals(5, fromConfig.getWriteCapacityUnits()); + assertEquals(1000L, fromConfig.getBufferCommitIntervalMillis()); + assertEquals(1000, fromConfig.getBufferSize()); + } + + @Test + public void testRegionWithAccessKeysWithPrefixWithReadWriteCapacityUnitsWithBufferSettings() throws Exception { + DynamoDBConfig fromConfig = DynamoDBConfig.fromConfig( + mapFrom("region", "eu-west-1", "accessKey", "access1", "secretKey", "secret1", "readCapacityUnits", "3", + "writeCapacityUnits", "5", "bufferCommitIntervalMillis", "501", "bufferSize", "112")); assertEquals(Regions.EU_WEST_1, fromConfig.getRegion()); assertEquals("access1", fromConfig.getCredentials().getAWSAccessKeyId()); assertEquals("secret1", fromConfig.getCredentials().getAWSSecretKey()); @@ -167,5 +207,7 @@ public void testRegionWithAccessKeysWithPrefixWithReadWriteCapacityUnits() throw assertEquals(true, fromConfig.isCreateTable()); assertEquals(3, fromConfig.getReadCapacityUnits()); assertEquals(5, fromConfig.getWriteCapacityUnits()); + assertEquals(501L, fromConfig.getBufferCommitIntervalMillis()); + assertEquals(112, fromConfig.getBufferSize()); } } From bcf491be300ca0f20618f2453d458d8d7c4be566 Mon Sep 17 00:00:00 2001 From: Sami Salonen Date: Sun, 28 Apr 2019 13:32:03 +0300 Subject: [PATCH 2/4] [dynamodb] Declaring dependency to AWS SDK Signed-off-by: Sami Salonen --- bundles/org.openhab.persistence.dynamodb/pom.xml | 7 +++++++ features/karaf/openhab-addons/src/main/feature/feature.xml | 2 ++ 2 files changed, 9 insertions(+) diff --git a/bundles/org.openhab.persistence.dynamodb/pom.xml b/bundles/org.openhab.persistence.dynamodb/pom.xml index ad30ee6fadce3..0b19540a23c05 100644 --- a/bundles/org.openhab.persistence.dynamodb/pom.xml +++ b/bundles/org.openhab.persistence.dynamodb/pom.xml @@ -13,4 +13,11 @@ openHAB Add-ons :: Bundles :: DynamoDB Persistence + + + com.amazonaws + aws-java-sdk-dynamodb + 1.11.213 + + diff --git a/features/karaf/openhab-addons/src/main/feature/feature.xml b/features/karaf/openhab-addons/src/main/feature/feature.xml index 27e915eb4ad8a..b78354dd7ab87 100644 --- a/features/karaf/openhab-addons/src/main/feature/feature.xml +++ b/features/karaf/openhab-addons/src/main/feature/feature.xml @@ -912,8 +912,10 @@ + wrap openhab-runtime-base mvn:org.openhab.persistence/org.openhab.persistence.dynamodb/${project.version} + wrap:mvn:com.amazonaws/aws-java-sdk-dynamodb/1.11.213$Bundle-Name=AWS%20Java%20SDK%20For%20Amazon%20DynamoDB&Bundle-SymbolicName=aws-java-sdk-dynamodb&Bundle-Version=1.11.213 mvn:${project.groupId}/openhab-addons-external/${project.version}/cfg/dynamodb From b7c5de7fd928ef972f69a58f7be7814a9c2e92ad Mon Sep 17 00:00:00 2001 From: Sami Salonen Date: Sun, 28 Apr 2019 13:32:25 +0300 Subject: [PATCH 3/4] [dynamodb] formatting Signed-off-by: Sami Salonen --- bundles/org.openhab.persistence.dynamodb/pom.xml | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/bundles/org.openhab.persistence.dynamodb/pom.xml b/bundles/org.openhab.persistence.dynamodb/pom.xml index 0b19540a23c05..7edc7a1c86dcb 100644 --- a/bundles/org.openhab.persistence.dynamodb/pom.xml +++ b/bundles/org.openhab.persistence.dynamodb/pom.xml @@ -1,17 +1,17 @@ - 4.0.0 + 4.0.0 - - org.openhab.addons.bundles - org.openhab.addons.reactor.bundles - 2.5.0-SNAPSHOT - + + org.openhab.addons.bundles + org.openhab.addons.reactor.bundles + 2.5.0-SNAPSHOT + - org.openhab.persistence.dynamodb + org.openhab.persistence.dynamodb - openHAB Add-ons :: Bundles :: DynamoDB Persistence + openHAB Add-ons :: Bundles :: DynamoDB Persistence From 2269e7145f5e06bd88712c73cb936730e8238fe8 Mon Sep 17 00:00:00 2001 From: Kai Kreuzer Date: Sun, 28 Apr 2019 18:17:34 +0200 Subject: [PATCH 4/4] Update feature.xml --- features/karaf/openhab-addons/src/main/feature/feature.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/features/karaf/openhab-addons/src/main/feature/feature.xml b/features/karaf/openhab-addons/src/main/feature/feature.xml index b78354dd7ab87..11b382ad1cfce 100644 --- a/features/karaf/openhab-addons/src/main/feature/feature.xml +++ b/features/karaf/openhab-addons/src/main/feature/feature.xml @@ -912,7 +912,7 @@ - wrap + wrap openhab-runtime-base mvn:org.openhab.persistence/org.openhab.persistence.dynamodb/${project.version} wrap:mvn:com.amazonaws/aws-java-sdk-dynamodb/1.11.213$Bundle-Name=AWS%20Java%20SDK%20For%20Amazon%20DynamoDB&Bundle-SymbolicName=aws-java-sdk-dynamodb&Bundle-Version=1.11.213