Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dynamodb] sync-up with openhab1-addons repo and bnd dependencies #4

Merged
merged 4 commits into from
Apr 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 47 additions & 17 deletions bundles/org.openhab.persistence.dynamodb/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,21 +55,21 @@ This service can be configured in the file `services/dynamodb.cfg`.

### Basic configuration

| Property | Default | Required | Description |
|----------|---------|:--------:|-------------|
| accessKey | | Yes | access key as shown in [Setting up Amazon account](#setting-up-amazon-account). |
| secretKey | | Yes | secret key as shown in [Setting up Amazon account](#setting-up-amazon-account). |
| region | | Yes | AWS region ID as described in [Setting up Amazon account](#setting-up-amazon-account). The region needs to match the region that was used to create the user. |
| Property | Default | Required | Description |
| --------- | ------- | :------: | ------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| accessKey | | Yes | access key as shown in [Setting up Amazon account](#setting-up-amazon-account). |
| secretKey | | Yes | secret key as shown in [Setting up Amazon account](#setting-up-amazon-account). |
| region | | Yes | AWS region ID as described in [Setting up Amazon account](#setting-up-amazon-account). The region needs to match the region that was used to create the user. |

### Configuration Using Credentials File

Alternatively, instead of specifying `accessKey` and `secretKey`, one can configure a configuration profile file.

| Property | Default | Required | Description |
|----------|---------|:--------:|-------------|
| profilesConfigFile | | Yes | path to the credentials file. For example, `/etc/openhab2/aws_creds`. Please note that the user that runs openHAB must have approriate read rights to the credential file. For more details on the Amazon credential file format, see [Amazon documentation](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-getting-started.html). |
| profile | | Yes | name of the profile to use |
| region | | Yes | AWS region ID as described in Step 2 in [Setting up Amazon account](#setting-up-amazon-account). The region needs to match the region that was used to create the user. |
| Property | Default | Required | Description |
| ------------------ | ------- | :------: | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| profilesConfigFile | | Yes | path to the credentials file. For example, `/etc/openhab2/aws_creds`. Please note that the user that runs openHAB must have approriate read rights to the credential file. For more details on the Amazon credential file format, see [Amazon documentation](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-getting-started.html). |
| profile | | Yes | name of the profile to use |
| region | | Yes | AWS region ID as described in Step 2 in [Setting up Amazon account](#setting-up-amazon-account). The region needs to match the region that was used to create the user. |

Example of service configuration file (`services/dynamodb.cfg`):

Expand All @@ -91,11 +91,15 @@ aws_secret_access_key=testSecretKey

In addition to the configuration properties above, the following are also available:

| Property | Default | Required | Description |
|----------|---------|:--------:|-------------|
| readCapacityUnits | 1 | No | read capacity for the created tables |
| writeCapacityUnits | 1 | No | write capacity for the created tables |
| tablePrefix | `openhab-` | No | table prefix used in the name of created tables |
| Property | Default | Required | Description |
| -------------------------- | ---------- | :------: | -------------------------------------------------------------------------------------------------- |
| readCapacityUnits | 1 | No | read capacity for the created tables |
| writeCapacityUnits | 1 | No | write capacity for the created tables |
| tablePrefix | `openhab-` | No | table prefix used in the name of created tables |
| bufferCommitIntervalMillis | 1000 | No | Interval to commit (write) buffered data. In milliseconds. |
| bufferSize | 1000 | No | Internal buffer size which is used to batch writes to DynamoDB every `bufferCommitIntervalMillis`. |

Typically you should not need to modify parameters related to buffering.

Refer to Amazon documentation on [provisioned throughput](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ProvisionedThroughput.html) for details on read/write capacity.

Expand All @@ -105,10 +109,20 @@ All item- and event-related configuration is done in the file `persistence/dynam

### Tables Creation

When an item is persisted via this service, a table is created (if necessary). Currently, the service will create at most two tables for different item types. The tables will be named `<prefix><item-type>`, where `<prefix>` matches the `tablePrefix` configuration property; while the `<item-type>` is either `bigdecimal` (numeric items) or `string` (string and complex items).
When an item is persisted via this service, a table is created (if necessary). Currently, the service will create at most two tables for different item types. The tables will be named `<tablePrefix><item-type>`, where the `<item-type>` is either `bigdecimal` (numeric items) or `string` (string and complex items).

Each table will have three columns: `itemname` (item name), `timeutc` (in ISO 8601 format with millisecond accuracy), and `itemstate` (either a number or string representing item state).

## Buffering

By default, the service is asynchronous which means that data is not written immediately to DynamoDB but instead buffered in-memory.
The size of the buffer, in terms of datapoints, can be configured with `bufferSize`.
Every `bufferCommitIntervalMillis` the whole buffer of data is flushed to DynamoDB.

It is recommended to have the buffering enabled since the synchronous behaviour (writing data immediately) might have adverse impact to the whole system when there is many items persisted at the same time. The buffering can be disabled by setting `bufferSize` to zero.

The defaults should be suitable in many use cases.

### Caveats

When the tables are created, the read/write capacity is configured according to configuration. However, the service does not modify the capacity of existing tables. As a workaround, you can modify the read/write capacity of existing tables using the [Amazon console](https://aws.amazon.com/console/).
Expand All @@ -128,4 +142,20 @@ When the tables are created, the read/write capacity is configured according to
7. Generate `.classpath` entries
`ls lib/*.jar | python -c "import sys;pre='<classpathentry exported=\"true\" kind=\"lib\" path=\"';post='\"/>'; print('\\t' + pre + (post + '\\n\\t' + pre).join(map(str.strip, sys.stdin.readlines())) + post)"`

After these changes, it's good practice to run integration tests (against live AWS DynamoDB) in `org.openhab.persistence.dynamodb.test` bundle. See README.md in the test bundle for more information how to execute the tests.
After these changes, it's good practice to run integration tests (against live AWS DynamoDB) in `org.openhab.persistence.dynamodb.test` bundle. See README.md in the test bundle for more information how to execute the tests.

### Running integration tests

To run integration tests, one needs to provide AWS credentials.

Eclipse instructions
1. Run all tests (in package org.openhab.persistence.dynamodb.internal) as JUnit Tests
2. Configure the run configuration, and open Arguments sheet
3. In VM arguments, provide the credentials for AWS
````
-DDYNAMODBTEST_REGION=REGION-ID
-DDYNAMODBTEST_ACCESS=ACCESS-KEY
-DDYNAMODBTEST_SECRET=SECRET
````

The tests will create tables with prefix `dynamodb-integration-tests-`. Note that when tests are begun, all data is removed from that table!
23 changes: 15 additions & 8 deletions bundles/org.openhab.persistence.dynamodb/pom.xml
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

<modelVersion>4.0.0</modelVersion>
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.openhab.addons.bundles</groupId>
<artifactId>org.openhab.addons.reactor.bundles</artifactId>
<version>2.5.0-SNAPSHOT</version>
</parent>
<parent>
<groupId>org.openhab.addons.bundles</groupId>
<artifactId>org.openhab.addons.reactor.bundles</artifactId>
<version>2.5.0-SNAPSHOT</version>
</parent>

<artifactId>org.openhab.persistence.dynamodb</artifactId>
<artifactId>org.openhab.persistence.dynamodb</artifactId>

<name>openHAB Add-ons :: Bundles :: DynamoDB Persistence</name>
<name>openHAB Add-ons :: Bundles :: DynamoDB Persistence</name>

<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-dynamodb</artifactId>
<version>1.11.213</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package org.openhab.persistence.dynamodb.internal;

import java.util.Date;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

import org.openhab.core.items.Item;
import org.openhab.core.persistence.PersistenceService;
import org.openhab.core.types.State;
import org.openhab.core.types.UnDefType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractBufferedPersistenceService<T> implements PersistenceService {

private static final long BUFFER_OFFER_TIMEOUT_MILLIS = 500;

private final Logger logger = LoggerFactory.getLogger(AbstractBufferedPersistenceService.class);
protected BlockingQueue<T> buffer;

private boolean writeImmediately;

protected void resetWithBufferSize(int bufferSize) {
int capacity = Math.max(1, bufferSize);
buffer = new ArrayBlockingQueue<T>(capacity, true);
writeImmediately = bufferSize == 0;
}

protected abstract T persistenceItemFromState(String name, State state, Date time);

protected abstract boolean isReadyToStore();

protected abstract void flushBufferedData();

@Override
public void store(Item item) {
store(item, null);
}

@Override
public void store(Item item, String alias) {
long storeStart = System.currentTimeMillis();
String uuid = UUID.randomUUID().toString();
if (item.getState() instanceof UnDefType) {
logger.debug("Undefined item state received. Not storing item {}.", item.getName());
return;
}
if (!isReadyToStore()) {
return;
}
if (buffer == null) {
throw new IllegalStateException("Buffer not initialized with resetWithBufferSize. Bug?");
}
Date time = new Date(storeStart);
String realName = item.getName();
String name = (alias != null) ? alias : realName;
State state = item.getState();
T persistenceItem = persistenceItemFromState(name, state, time);
logger.trace("store() called with item {}, which was converted to {} [{}]", item, persistenceItem, uuid);
if (writeImmediately) {
logger.debug("Writing immediately item {} [{}]", realName, uuid);
// We want to write everything immediately
// Synchronous behaviour to ensure buffer does not get full.
synchronized (this) {
boolean buffered = addToBuffer(persistenceItem);
assert buffered;
flushBufferedData();
}
} else {
long bufferStart = System.currentTimeMillis();
boolean buffered = addToBuffer(persistenceItem);
if (buffered) {
logger.debug("Buffered item {} in {} ms. Total time for store(): {} [{}]", realName,
System.currentTimeMillis() - bufferStart, System.currentTimeMillis() - storeStart, uuid);
} else {
logger.debug(
"Buffer is full. Writing buffered data immediately and trying again. Consider increasing bufferSize");
// Buffer is full, commit it immediately
flushBufferedData();
boolean buffered2 = addToBuffer(persistenceItem);
if (buffered2) {
logger.debug("Buffered item in {} ms (2nd try, flushed buffer in-between) [{}]",
System.currentTimeMillis() - bufferStart, uuid);
} else {
// The unlikely case happened -- buffer got full again immediately
logger.warn("Buffering failed for the second time -- Too small bufferSize? Discarding data [{}]",
uuid);
}
}
}
}

protected boolean addToBuffer(T persistenceItem) {
try {
return buffer.offer(persistenceItem, BUFFER_OFFER_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
logger.warn("Interrupted when trying to buffer data! Dropping data");
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public class DynamoDBConfig {
public static final boolean DEFAULT_CREATE_TABLE_ON_DEMAND = true;
public static final long DEFAULT_READ_CAPACITY_UNITS = 1;
public static final long DEFAULT_WRITE_CAPACITY_UNITS = 1;
public static final long DEFAULT_BUFFER_COMMIT_INTERVAL_MILLIS = 1000;
public static final int DEFAULT_BUFFER_SIZE = 1000;

private static final Logger logger = LoggerFactory.getLogger(DynamoDBConfig.class);

Expand All @@ -40,6 +42,8 @@ public class DynamoDBConfig {
private boolean createTable = DEFAULT_CREATE_TABLE_ON_DEMAND;
private long readCapacityUnits = DEFAULT_READ_CAPACITY_UNITS;
private long writeCapacityUnits = DEFAULT_WRITE_CAPACITY_UNITS;
private long bufferCommitIntervalMillis = DEFAULT_BUFFER_COMMIT_INTERVAL_MILLIS;
private int bufferSize = DEFAULT_BUFFER_SIZE;

/**
*
Expand Down Expand Up @@ -117,21 +121,42 @@ public static DynamoDBConfig fromConfig(Map<String, Object> config) {
writeCapacityUnits = Long.parseLong(writeCapacityUnitsParam);
}

return new DynamoDBConfig(region, credentials, table, createTable, readCapacityUnits, writeCapacityUnits);
final long bufferCommitIntervalMillis;
String bufferCommitIntervalMillisParam = (String) config.get("bufferCommitIntervalMillis");
if (isBlank(bufferCommitIntervalMillisParam)) {
logger.debug("Buffer commit interval millis: {}", DEFAULT_BUFFER_COMMIT_INTERVAL_MILLIS);
bufferCommitIntervalMillis = DEFAULT_BUFFER_COMMIT_INTERVAL_MILLIS;
} else {
bufferCommitIntervalMillis = Long.parseLong(bufferCommitIntervalMillisParam);
}

final int bufferSize;
String bufferSizeParam = (String) config.get("bufferSize");
if (isBlank(bufferSizeParam)) {
logger.debug("Buffer size: {}", DEFAULT_BUFFER_SIZE);
bufferSize = DEFAULT_BUFFER_SIZE;
} else {
bufferSize = Integer.parseInt(bufferSizeParam);
}

return new DynamoDBConfig(region, credentials, table, createTable, readCapacityUnits, writeCapacityUnits,
bufferCommitIntervalMillis, bufferSize);
} catch (Exception e) {
logger.error("Error with configuration", e);
return null;
}
}

public DynamoDBConfig(Regions region, AWSCredentials credentials, String table, boolean createTable,
long readCapacityUnits, long writeCapacityUnits) {
long readCapacityUnits, long writeCapacityUnits, long bufferCommitIntervalMillis, int bufferSize) {
this.region = region;
this.credentials = credentials;
this.tablePrefix = table;
this.createTable = createTable;
this.readCapacityUnits = readCapacityUnits;
this.writeCapacityUnits = writeCapacityUnits;
this.bufferCommitIntervalMillis = bufferCommitIntervalMillis;
this.bufferSize = bufferSize;
}

public AWSCredentials getCredentials() {
Expand All @@ -158,6 +183,14 @@ public long getWriteCapacityUnits() {
return writeCapacityUnits;
}

public long getBufferCommitIntervalMillis() {
return bufferCommitIntervalMillis;
}

public int getBufferSize() {
return bufferSize;
}

private static void invalidRegionLogHelp(String region) {
Regions[] regions = Regions.values();
String[] regionNames = new String[regions.length];
Expand Down
Loading