forked from openhab/openhab-addons
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[dynamodb] sync-up with openhab1-addons repo and bnd dependencies (#4)
* [dynamodb] latest changes synced from openhab1-addons repo In addition, removed dependency to com.google.common.collect. Corresponding PRs: openhab#5847 and openhab#5826 Signed-off-by: Sami Salonen <[email protected]>
- Loading branch information
1 parent
7b9fa26
commit c465ffd
Showing
10 changed files
with
672 additions
and
246 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,16 +1,23 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> | ||
|
||
<modelVersion>4.0.0</modelVersion> | ||
<modelVersion>4.0.0</modelVersion> | ||
|
||
<parent> | ||
<groupId>org.openhab.addons.bundles</groupId> | ||
<artifactId>org.openhab.addons.reactor.bundles</artifactId> | ||
<version>2.5.0-SNAPSHOT</version> | ||
</parent> | ||
<parent> | ||
<groupId>org.openhab.addons.bundles</groupId> | ||
<artifactId>org.openhab.addons.reactor.bundles</artifactId> | ||
<version>2.5.0-SNAPSHOT</version> | ||
</parent> | ||
|
||
<artifactId>org.openhab.persistence.dynamodb</artifactId> | ||
<artifactId>org.openhab.persistence.dynamodb</artifactId> | ||
|
||
<name>openHAB Add-ons :: Bundles :: DynamoDB Persistence</name> | ||
<name>openHAB Add-ons :: Bundles :: DynamoDB Persistence</name> | ||
|
||
<dependencies> | ||
<dependency> | ||
<groupId>com.amazonaws</groupId> | ||
<artifactId>aws-java-sdk-dynamodb</artifactId> | ||
<version>1.11.213</version> | ||
</dependency> | ||
</dependencies> | ||
</project> |
103 changes: 103 additions & 0 deletions
103
...in/java/org/openhab/persistence/dynamodb/internal/AbstractBufferedPersistenceService.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
package org.openhab.persistence.dynamodb.internal; | ||
|
||
import java.util.Date; | ||
import java.util.UUID; | ||
import java.util.concurrent.ArrayBlockingQueue; | ||
import java.util.concurrent.BlockingQueue; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import org.openhab.core.items.Item; | ||
import org.openhab.core.persistence.PersistenceService; | ||
import org.openhab.core.types.State; | ||
import org.openhab.core.types.UnDefType; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
public abstract class AbstractBufferedPersistenceService<T> implements PersistenceService { | ||
|
||
private static final long BUFFER_OFFER_TIMEOUT_MILLIS = 500; | ||
|
||
private final Logger logger = LoggerFactory.getLogger(AbstractBufferedPersistenceService.class); | ||
protected BlockingQueue<T> buffer; | ||
|
||
private boolean writeImmediately; | ||
|
||
protected void resetWithBufferSize(int bufferSize) { | ||
int capacity = Math.max(1, bufferSize); | ||
buffer = new ArrayBlockingQueue<T>(capacity, true); | ||
writeImmediately = bufferSize == 0; | ||
} | ||
|
||
protected abstract T persistenceItemFromState(String name, State state, Date time); | ||
|
||
protected abstract boolean isReadyToStore(); | ||
|
||
protected abstract void flushBufferedData(); | ||
|
||
@Override | ||
public void store(Item item) { | ||
store(item, null); | ||
} | ||
|
||
@Override | ||
public void store(Item item, String alias) { | ||
long storeStart = System.currentTimeMillis(); | ||
String uuid = UUID.randomUUID().toString(); | ||
if (item.getState() instanceof UnDefType) { | ||
logger.debug("Undefined item state received. Not storing item {}.", item.getName()); | ||
return; | ||
} | ||
if (!isReadyToStore()) { | ||
return; | ||
} | ||
if (buffer == null) { | ||
throw new IllegalStateException("Buffer not initialized with resetWithBufferSize. Bug?"); | ||
} | ||
Date time = new Date(storeStart); | ||
String realName = item.getName(); | ||
String name = (alias != null) ? alias : realName; | ||
State state = item.getState(); | ||
T persistenceItem = persistenceItemFromState(name, state, time); | ||
logger.trace("store() called with item {}, which was converted to {} [{}]", item, persistenceItem, uuid); | ||
if (writeImmediately) { | ||
logger.debug("Writing immediately item {} [{}]", realName, uuid); | ||
// We want to write everything immediately | ||
// Synchronous behaviour to ensure buffer does not get full. | ||
synchronized (this) { | ||
boolean buffered = addToBuffer(persistenceItem); | ||
assert buffered; | ||
flushBufferedData(); | ||
} | ||
} else { | ||
long bufferStart = System.currentTimeMillis(); | ||
boolean buffered = addToBuffer(persistenceItem); | ||
if (buffered) { | ||
logger.debug("Buffered item {} in {} ms. Total time for store(): {} [{}]", realName, | ||
System.currentTimeMillis() - bufferStart, System.currentTimeMillis() - storeStart, uuid); | ||
} else { | ||
logger.debug( | ||
"Buffer is full. Writing buffered data immediately and trying again. Consider increasing bufferSize"); | ||
// Buffer is full, commit it immediately | ||
flushBufferedData(); | ||
boolean buffered2 = addToBuffer(persistenceItem); | ||
if (buffered2) { | ||
logger.debug("Buffered item in {} ms (2nd try, flushed buffer in-between) [{}]", | ||
System.currentTimeMillis() - bufferStart, uuid); | ||
} else { | ||
// The unlikely case happened -- buffer got full again immediately | ||
logger.warn("Buffering failed for the second time -- Too small bufferSize? Discarding data [{}]", | ||
uuid); | ||
} | ||
} | ||
} | ||
} | ||
|
||
protected boolean addToBuffer(T persistenceItem) { | ||
try { | ||
return buffer.offer(persistenceItem, BUFFER_OFFER_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); | ||
} catch (InterruptedException e) { | ||
logger.warn("Interrupted when trying to buffer data! Dropping data"); | ||
return false; | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.