diff --git a/bundles/org.openhab.core.automation.module.script/src/main/java/org/openhab/core/automation/module/script/defaultscope/ScriptBusEvent.java b/bundles/org.openhab.core.automation.module.script/src/main/java/org/openhab/core/automation/module/script/defaultscope/ScriptBusEvent.java index bb1bbcd43a9..61bf878e5c3 100644 --- a/bundles/org.openhab.core.automation.module.script/src/main/java/org/openhab/core/automation/module/script/defaultscope/ScriptBusEvent.java +++ b/bundles/org.openhab.core.automation.module.script/src/main/java/org/openhab/core/automation/module/script/defaultscope/ScriptBusEvent.java @@ -12,6 +12,7 @@ */ package org.openhab.core.automation.module.script.defaultscope; +import java.time.ZonedDateTime; import java.util.Map; import org.eclipse.jdt.annotation.NonNullByDefault; @@ -19,6 +20,7 @@ import org.openhab.core.items.Item; import org.openhab.core.types.Command; import org.openhab.core.types.State; +import org.openhab.core.types.TimeSeries; /** * The static methods of this class are made available as functions in the scripts. @@ -98,7 +100,6 @@ public interface ScriptBusEvent { /** * Posts a status update for a specified item to the event bus. - * t * * @param item the item to send the status update for * @param state the new state of the item @@ -106,6 +107,26 @@ public interface ScriptBusEvent { @Nullable Object postUpdate(@Nullable Item item, @Nullable State state); + /** + * Sends a time series to the event bus + * + * @param item the item to send the time series for + * @param timeSeries a {@link TimeSeries} containing policy and values + */ + @Nullable + Object sendTimeSeries(@Nullable Item item, @Nullable TimeSeries timeSeries); + + /** + * Sends a time series to the event bus + * + * @param itemName the name of the item to send the status update for + * @param values a {@link Map} containing the timeseries, composed of pairs of {@link ZonedDateTime} and + * {@link State} + * @param policy either ADD or REPLACE + */ + @Nullable + Object sendTimeSeries(@Nullable String itemName, @Nullable Map values, String policy); + /** * Stores the current states for a list of items in a map. * A group item is not itself put into the map, but instead all its members. diff --git a/bundles/org.openhab.core.automation.module.script/src/main/java/org/openhab/core/automation/module/script/internal/defaultscope/ScriptBusEventImpl.java b/bundles/org.openhab.core.automation.module.script/src/main/java/org/openhab/core/automation/module/script/internal/defaultscope/ScriptBusEventImpl.java index 72490aff17b..11e51c39e03 100644 --- a/bundles/org.openhab.core.automation.module.script/src/main/java/org/openhab/core/automation/module/script/internal/defaultscope/ScriptBusEventImpl.java +++ b/bundles/org.openhab.core.automation.module.script/src/main/java/org/openhab/core/automation/module/script/internal/defaultscope/ScriptBusEventImpl.java @@ -12,6 +12,7 @@ */ package org.openhab.core.automation.module.script.internal.defaultscope; +import java.time.ZonedDateTime; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; @@ -27,6 +28,7 @@ import org.openhab.core.items.events.ItemEventFactory; import org.openhab.core.types.Command; import org.openhab.core.types.State; +import org.openhab.core.types.TimeSeries; import org.openhab.core.types.TypeParser; import org.slf4j.LoggerFactory; @@ -153,6 +155,31 @@ public void dispose() { return null; } + @Override + public @Nullable Object sendTimeSeries(@Nullable Item item, @Nullable TimeSeries timeSeries) { + EventPublisher eventPublisher1 = this.eventPublisher; + if (eventPublisher1 != null && item != null && timeSeries != null) { + eventPublisher1.post(ItemEventFactory.createTimeSeriesEvent(item.getName(), timeSeries, null)); + } + return null; + } + + @Override + public @Nullable Object sendTimeSeries(@Nullable String itemName, @Nullable Map values, + @Nullable String policy) { + EventPublisher eventPublisher1 = this.eventPublisher; + if (eventPublisher1 != null && itemName != null && values != null && policy != null) { + try { + TimeSeries timeSeries = new TimeSeries(TimeSeries.Policy.valueOf(policy)); + values.forEach((key, value) -> timeSeries.add(key.toInstant(), value)); + eventPublisher1.post(ItemEventFactory.createTimeSeriesEvent(itemName, timeSeries, null)); + } catch (IllegalArgumentException e) { + LoggerFactory.getLogger(ScriptBusEventImpl.class).warn("Policy '{}' does not exist.", policy); + } + } + return null; + } + @Override public Map storeStates(Item @Nullable... items) { Map statesMap = new HashMap<>(); diff --git a/bundles/org.openhab.core.model.persistence/src/org/openhab/core/model/persistence/scoping/GlobalStrategies.java b/bundles/org.openhab.core.model.persistence/src/org/openhab/core/model/persistence/scoping/GlobalStrategies.java index 5d4e901edcf..401feb97b06 100644 --- a/bundles/org.openhab.core.model.persistence/src/org/openhab/core/model/persistence/scoping/GlobalStrategies.java +++ b/bundles/org.openhab.core.model.persistence/src/org/openhab/core/model/persistence/scoping/GlobalStrategies.java @@ -43,4 +43,10 @@ public String getName() { return "restoreOnStartup"; }; }; + public static final Strategy FORECAST = new StrategyImpl() { + @Override + public String getName() { + return "forecast"; + }; + }; } diff --git a/bundles/org.openhab.core.model.persistence/src/org/openhab/core/model/persistence/scoping/PersistenceGlobalScopeProvider.java b/bundles/org.openhab.core.model.persistence/src/org/openhab/core/model/persistence/scoping/PersistenceGlobalScopeProvider.java index 746d932a13e..d070495fa14 100644 --- a/bundles/org.openhab.core.model.persistence/src/org/openhab/core/model/persistence/scoping/PersistenceGlobalScopeProvider.java +++ b/bundles/org.openhab.core.model.persistence/src/org/openhab/core/model/persistence/scoping/PersistenceGlobalScopeProvider.java @@ -38,6 +38,7 @@ public class PersistenceGlobalScopeProvider extends AbstractGlobalScopeProvider res.getContents().add(GlobalStrategies.UPDATE); res.getContents().add(GlobalStrategies.CHANGE); res.getContents().add(GlobalStrategies.RESTORE); + res.getContents().add(GlobalStrategies.FORECAST); } @Override diff --git a/bundles/org.openhab.core.model.script/src/org/openhab/core/model/script/actions/BusEvent.java b/bundles/org.openhab.core.model.script/src/org/openhab/core/model/script/actions/BusEvent.java index 15ccd3bb146..bf07cd620d2 100644 --- a/bundles/org.openhab.core.model.script/src/org/openhab/core/model/script/actions/BusEvent.java +++ b/bundles/org.openhab.core.model.script/src/org/openhab/core/model/script/actions/BusEvent.java @@ -12,11 +12,13 @@ */ package org.openhab.core.model.script.actions; +import java.time.ZonedDateTime; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import org.eclipse.jdt.annotation.Nullable; import org.openhab.core.events.EventPublisher; import org.openhab.core.items.GroupItem; import org.openhab.core.items.Item; @@ -26,6 +28,7 @@ import org.openhab.core.model.script.ScriptServiceUtil; import org.openhab.core.types.Command; import org.openhab.core.types.State; +import org.openhab.core.types.TimeSeries; import org.openhab.core.types.TypeParser; import org.slf4j.LoggerFactory; @@ -146,7 +149,7 @@ public static Object postUpdate(Item item, String stateAsString) { * Posts a status update for a specified item to the event bus. * * @param itemName the name of the item to send the status update for - * @param stateAsString the new state of the item + * @param stateString the new state of the item */ public static Object postUpdate(String itemName, String stateString) { ItemRegistry registry = ScriptServiceUtil.getItemRegistry(); @@ -169,6 +172,43 @@ public static Object postUpdate(String itemName, String stateString) { return null; } + /** + * Sends a time series to the event bus + * + * @param item the item to send the time series for + * @param timeSeries a {@link TimeSeries} containing policy and values + */ + public static Object sendTimeSeries(@Nullable Item item, @Nullable TimeSeries timeSeries) { + EventPublisher eventPublisher1 = ScriptServiceUtil.getEventPublisher(); + if (eventPublisher1 != null && item != null && timeSeries != null) { + eventPublisher1.post(ItemEventFactory.createTimeSeriesEvent(item.getName(), timeSeries, null)); + } + return null; + } + + /** + * Sends a time series to the event bus + * + * @param itemName the name of the item to send the status update for + * @param values a {@link Map} containing the timeseries, composed of pairs of {@link ZonedDateTime} and + * {@link State} + * @param policy either ADD or REPLACE + */ + public static Object sendTimeSeries(@Nullable String itemName, @Nullable Map values, + String policy) { + EventPublisher eventPublisher1 = ScriptServiceUtil.getEventPublisher(); + if (eventPublisher1 != null && itemName != null && values != null && policy != null) { + try { + TimeSeries timeSeries = new TimeSeries(TimeSeries.Policy.valueOf(policy)); + values.forEach((key, value) -> timeSeries.add(key.toInstant(), value)); + eventPublisher1.post(ItemEventFactory.createTimeSeriesEvent(itemName, timeSeries, null)); + } catch (IllegalArgumentException e) { + LoggerFactory.getLogger(BusEvent.class).warn("Policy '{}' does not exist.", policy); + } + } + return null; + } + private static List getAcceptedDataTypeNames(Item item) { return item.getAcceptedDataTypes().stream().map(Class::getSimpleName).toList(); } @@ -232,6 +272,4 @@ public static Object restoreStates(Map statesMap) { } return null; } - - // static public JobKey timer(AbstractInstant instant, Object) } diff --git a/bundles/org.openhab.core.persistence/src/main/java/org/openhab/core/persistence/ModifiablePersistenceService.java b/bundles/org.openhab.core.persistence/src/main/java/org/openhab/core/persistence/ModifiablePersistenceService.java index 2feb4f4660f..920fde8d8fb 100644 --- a/bundles/org.openhab.core.persistence/src/main/java/org/openhab/core/persistence/ModifiablePersistenceService.java +++ b/bundles/org.openhab.core.persistence/src/main/java/org/openhab/core/persistence/ModifiablePersistenceService.java @@ -15,6 +15,7 @@ import java.time.ZonedDateTime; import org.eclipse.jdt.annotation.NonNullByDefault; +import org.eclipse.jdt.annotation.Nullable; import org.openhab.core.items.Item; import org.openhab.core.types.State; @@ -46,6 +47,25 @@ public interface ModifiablePersistenceService extends QueryablePersistenceServic */ void store(Item item, ZonedDateTime date, State state); + /** + *

+ * Stores the historic item value under a specified alias. This allows the item, time and value to be specified. + * + *

+ * Adding data with the same time as an existing record should update the current record value rather than adding a + * new record. + * + *

+ * Implementors should keep in mind that all registered {@link PersistenceService}s are called synchronously. Hence + * long running operations should be processed asynchronously. E.g. store adds things to a queue which + * is processed by some asynchronous workers (Quartz Job, Thread, etc.). + * + * @param item the data to be stored + * @param date the date of the record + * @param state the state to be recorded + */ + void store(Item item, ZonedDateTime date, State state, @Nullable String alias); + /** * Removes data associated with an item from a persistence service. * If all data is removed for the specified item, the persistence service should free any resources associated with diff --git a/bundles/org.openhab.core.persistence/src/main/java/org/openhab/core/persistence/internal/PersistenceManager.java b/bundles/org.openhab.core.persistence/src/main/java/org/openhab/core/persistence/internal/PersistenceManager.java index 040be881101..b34282fcfa1 100644 --- a/bundles/org.openhab.core.persistence/src/main/java/org/openhab/core/persistence/internal/PersistenceManager.java +++ b/bundles/org.openhab.core.persistence/src/main/java/org/openhab/core/persistence/internal/PersistenceManager.java @@ -12,6 +12,14 @@ */ package org.openhab.core.persistence.internal; +import static org.openhab.core.persistence.FilterCriteria.Ordering.ASCENDING; +import static org.openhab.core.persistence.strategy.PersistenceStrategy.Globals.FORECAST; +import static org.openhab.core.persistence.strategy.PersistenceStrategy.Globals.RESTORE; +import static org.openhab.core.persistence.strategy.PersistenceStrategy.Globals.UPDATE; + +import java.time.Instant; +import java.time.ZoneId; +import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.util.Collection; import java.util.HashSet; @@ -23,6 +31,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; @@ -37,8 +46,10 @@ import org.openhab.core.items.ItemRegistry; import org.openhab.core.items.ItemRegistryChangeListener; import org.openhab.core.items.StateChangeListener; +import org.openhab.core.items.TimeSeriesListener; import org.openhab.core.persistence.FilterCriteria; import org.openhab.core.persistence.HistoricItem; +import org.openhab.core.persistence.ModifiablePersistenceService; import org.openhab.core.persistence.PersistenceItemConfiguration; import org.openhab.core.persistence.PersistenceService; import org.openhab.core.persistence.QueryablePersistenceService; @@ -53,12 +64,14 @@ import org.openhab.core.persistence.strategy.PersistenceStrategy; import org.openhab.core.scheduler.CronScheduler; import org.openhab.core.scheduler.ScheduledCompletableFuture; +import org.openhab.core.scheduler.Scheduler; import org.openhab.core.service.ReadyMarker; import org.openhab.core.service.ReadyMarkerFilter; import org.openhab.core.service.ReadyService; import org.openhab.core.service.ReadyService.ReadyTracker; import org.openhab.core.service.StartLevelService; import org.openhab.core.types.State; +import org.openhab.core.types.TimeSeries; import org.openhab.core.types.UnDefType; import org.osgi.service.component.annotations.Activate; import org.osgi.service.component.annotations.Component; @@ -75,18 +88,19 @@ * @author Kai Kreuzer - Initial contribution * @author Markus Rathgeb - Separation of persistence core and model, drop Quartz usage. * @author Jan N. Klug - Refactored to use service configuration registry + * @author Jan N. Klug - Added time series support */ @Component(immediate = true) @NonNullByDefault public class PersistenceManager implements ItemRegistryChangeListener, StateChangeListener, ReadyTracker, - PersistenceServiceConfigurationRegistryChangeListener { - + PersistenceServiceConfigurationRegistryChangeListener, TimeSeriesListener { private final Logger logger = LoggerFactory.getLogger(PersistenceManager.class); private final ReadyMarker marker = new ReadyMarker("persistence", "restore"); // the scheduler used for timer events - private final CronScheduler scheduler; + private final CronScheduler cronScheduler; + private final Scheduler scheduler; private final ItemRegistry itemRegistry; private final SafeCaller safeCaller; private final ReadyService readyService; @@ -97,9 +111,11 @@ public class PersistenceManager implements ItemRegistryChangeListener, StateChan private final Map persistenceServiceContainers = new ConcurrentHashMap<>(); @Activate - public PersistenceManager(final @Reference CronScheduler scheduler, final @Reference ItemRegistry itemRegistry, - final @Reference SafeCaller safeCaller, final @Reference ReadyService readyService, + public PersistenceManager(final @Reference CronScheduler cronScheduler, final @Reference Scheduler scheduler, + final @Reference ItemRegistry itemRegistry, final @Reference SafeCaller safeCaller, + final @Reference ReadyService readyService, final @Reference PersistenceServiceConfigurationRegistry persistenceServiceConfigurationRegistry) { + this.cronScheduler = cronScheduler; this.scheduler = scheduler; this.itemRegistry = itemRegistry; this.safeCaller = safeCaller; @@ -118,6 +134,7 @@ protected void deactivate() { started = false; persistenceServiceContainers.values().forEach(PersistenceServiceContainer::cancelPersistJobs); + persistenceServiceContainers.values().forEach(PersistenceServiceContainer::cancelForecastJobs); // remove item state change listeners itemRegistry.stream().filter(GenericItem.class::isInstance) @@ -136,6 +153,7 @@ protected void addPersistenceService(PersistenceService persistenceService) { if (oldContainer != null) { // cancel all jobs if the persistence service is set and an old configuration is // already present oldContainer.cancelPersistJobs(); + oldContainer.cancelForecastJobs(); } if (started) { @@ -147,6 +165,7 @@ protected void removePersistenceService(PersistenceService persistenceService) { PersistenceServiceContainer container = persistenceServiceContainers.remove(persistenceService.getId()); if (container != null) { container.cancelPersistJobs(); + container.cancelForecastJobs(); } } @@ -237,61 +256,8 @@ private Iterable getAllItems(PersistenceItemConfiguration config) { return items; } - /** - * Handles the "restoreOnStartup" strategy for the item. - * If the item state is still undefined when entering this method, all persistence configurations are checked, - * if they have the "restoreOnStartup" strategy configured for the item. If so, the item state will be set - * to its last persisted value. - * - * @param item the item to restore the state for - */ - private void restoreItemStateIfNeeded(Item item) { - // get the last persisted state from the persistence service if no state is yet set - if (UnDefType.NULL.equals(item.getState()) && item instanceof GenericItem) { - List matchingContainers = persistenceServiceContainers.values().stream() // - .filter(container -> container.getPersistenceService() instanceof QueryablePersistenceService) // - .filter(container -> container.getMatchingConfigurations(PersistenceStrategy.Globals.RESTORE) - .anyMatch(itemConfig -> appliesToItem(itemConfig, item))) - .toList(); - - for (PersistenceServiceContainer container : matchingContainers) { - QueryablePersistenceService queryService = (QueryablePersistenceService) container - .getPersistenceService(); - FilterCriteria filter = new FilterCriteria().setItemName(item.getName()).setPageSize(1); - Iterable result = safeCaller.create(queryService, QueryablePersistenceService.class) - .onTimeout(() -> { - logger.warn("Querying persistence service '{}' to restore '{}' takes more than {}ms.", - queryService.getId(), item.getName(), SafeCaller.DEFAULT_TIMEOUT); - }) - .onException(e -> logger.error( - "Exception occurred while querying persistence service '{}' to restore '{}': {}", - queryService.getId(), item.getName(), e.getMessage(), e)) - .build().query(filter); - if (result == null) { - // in case of an exception or timeout, the safe caller returns null - continue; - } - Iterator it = result.iterator(); - if (it.hasNext()) { - HistoricItem historicItem = it.next(); - GenericItem genericItem = (GenericItem) item; - genericItem.removeStateChangeListener(this); - genericItem.setState(historicItem.getState()); - genericItem.addStateChangeListener(this); - if (logger.isDebugEnabled()) { - logger.debug("Restored item state from '{}' for item '{}' -> '{}'", - DateTimeFormatter.ISO_ZONED_DATE_TIME.format(historicItem.getTimestamp()), - item.getName(), historicItem.getState()); - } - return; - } - } - } - } - private void startEventHandling(PersistenceServiceContainer serviceContainer) { - serviceContainer.getMatchingConfigurations(PersistenceStrategy.Globals.RESTORE) - .forEach(itemConfig -> getAllItems(itemConfig).forEach(this::restoreItemStateIfNeeded)); + serviceContainer.restoreStatesAndScheduleForecastJobs(); serviceContainer.schedulePersistJobs(); } @@ -304,16 +270,19 @@ public void allItemsChanged(Collection oldItemNames) { @Override public void added(Item item) { - restoreItemStateIfNeeded(item); + persistenceServiceContainers.values().forEach(container -> container.addItem(item)); if (item instanceof GenericItem genericItem) { genericItem.addStateChangeListener(this); + genericItem.addTimeSeriesListener(this); } } @Override public void removed(Item item) { + persistenceServiceContainers.values().forEach(container -> container.removeItem(item.getName())); if (item instanceof GenericItem genericItem) { genericItem.removeStateChangeListener(this); + genericItem.removeTimeSeriesListener(this); } } @@ -333,6 +302,50 @@ public void stateUpdated(Item item, State state) { handleStateEvent(item, false); } + @Override + public void timeSeriesUpdated(Item item, TimeSeries timeSeries) { + if (timeSeries.size() == 0) { + // discard empty time series + return; + } + persistenceServiceContainers.values().stream() + .filter(psc -> psc.persistenceService instanceof ModifiablePersistenceService) + .forEach(container -> Stream + .concat(container.getMatchingConfigurations(UPDATE), + container.getMatchingConfigurations(FORECAST)) + .distinct().filter(itemConfig -> appliesToItem(itemConfig, item)).forEach(itemConfig -> { + ModifiablePersistenceService service = (ModifiablePersistenceService) container + .getPersistenceService(); + // remove old values if replace selected + if (timeSeries.getPolicy() == TimeSeries.Policy.REPLACE) { + ZonedDateTime begin = timeSeries.getBegin().atZone(ZoneId.systemDefault()); + ZonedDateTime end = timeSeries.getEnd().atZone(ZoneId.systemDefault()); + FilterCriteria removeFilter = new FilterCriteria().setItemName(item.getName()) + .setBeginDate(begin).setEndDate(end); + service.remove(removeFilter); + ScheduledCompletableFuture forecastJob = container.forecastJobs.get(item.getName()); + if (forecastJob != null && forecastJob.getScheduledTime().isAfter(begin) + && forecastJob.getScheduledTime().isBefore(end)) { + forecastJob.cancel(true); + container.forecastJobs.remove(item.getName()); + } + } + // update states + timeSeries.getStates().forEach( + e -> service.store(item, e.timestamp().atZone(ZoneId.systemDefault()), e.state())); + timeSeries.getStates().filter(s -> s.timestamp().isAfter(Instant.now())).findFirst() + .ifPresent(s -> { + ScheduledCompletableFuture forecastJob = container.forecastJobs + .get(item.getName()); + if (forecastJob == null || forecastJob.getScheduledTime() + .isAfter(s.timestamp().atZone(ZoneId.systemDefault()))) { + container.scheduleNextForecastForItem(item.getName(), s.timestamp(), + s.state()); + } + }); + })); + } + @Override public void onReadyMarkerAdded(ReadyMarker readyMarker) { ExecutorService scheduler = Executors.newSingleThreadExecutor(new NamedThreadFactory("persistenceManager")); @@ -381,7 +394,9 @@ public void updated(PersistenceServiceConfiguration oldElement, PersistenceServi private class PersistenceServiceContainer { private final PersistenceService persistenceService; - private final Set> jobs = new HashSet<>(); + private final Set> persistJobs = new HashSet<>(); + private final Map> forecastJobs = new ConcurrentHashMap<>(); + private final Map> strategyCache = new ConcurrentHashMap<>(); private PersistenceServiceConfiguration configuration; @@ -403,19 +418,25 @@ public PersistenceService getPersistenceService() { */ public void setConfiguration(@Nullable PersistenceServiceConfiguration configuration) { cancelPersistJobs(); + cancelForecastJobs(); this.configuration = Objects.requireNonNullElseGet(configuration, this::getDefaultConfig); + strategyCache.clear(); } /** * Get all item configurations from this service that match a certain strategy * * @param strategy the {@link PersistenceStrategy} to look for - * @return a @link Stream} of the result + * @return a {@link Stream} of the result */ public Stream getMatchingConfigurations(PersistenceStrategy strategy) { - boolean matchesDefaultStrategies = configuration.getDefaults().contains(strategy); - return configuration.getConfigs().stream().filter(itemConfig -> itemConfig.strategies().contains(strategy) - || (itemConfig.strategies().isEmpty() && matchesDefaultStrategies)); + return Objects.requireNonNull(strategyCache.computeIfAbsent(strategy, s -> { + boolean matchesDefaultStrategies = configuration.getDefaults().contains(strategy); + return configuration.getConfigs().stream() + .filter(itemConfig -> itemConfig.strategies().contains(strategy) + || (itemConfig.strategies().isEmpty() && matchesDefaultStrategies)) + .toList(); + }).stream()); } private PersistenceServiceConfiguration getDefaultConfig() { @@ -430,11 +451,19 @@ private PersistenceServiceConfiguration getDefaultConfig() { * Cancel all scheduled cron jobs / strategies for this service */ public void cancelPersistJobs() { - synchronized (jobs) { - jobs.forEach(job -> job.cancel(true)); - jobs.clear(); + synchronized (persistJobs) { + persistJobs.forEach(job -> job.cancel(true)); + persistJobs.clear(); + } + logger.debug("Removed scheduled cron jobs for persistence service '{}'", configuration.getUID()); + } + + public void cancelForecastJobs() { + synchronized (forecastJobs) { + forecastJobs.values().forEach(job -> job.cancel(true)); + forecastJobs.clear(); } - logger.debug("Removed scheduled cron job for persistence service '{}'", configuration.getUID()); + logger.debug("Removed scheduled forecast jobs for persistence service '{}'", configuration.getUID()); } /** @@ -446,7 +475,7 @@ public void schedulePersistJobs() { PersistenceCronStrategy cronStrategy = (PersistenceCronStrategy) strategy; String cronExpression = cronStrategy.getCronExpression(); List itemConfigs = getMatchingConfigurations(strategy).toList(); - jobs.add(scheduler.schedule(() -> persistJob(itemConfigs), cronExpression)); + persistJobs.add(cronScheduler.schedule(() -> persistJob(itemConfigs), cronExpression)); logger.debug("Scheduled strategy {} with cron expression {} for service {}", cronStrategy.getName(), cronExpression, configuration.getUID()); @@ -454,6 +483,108 @@ public void schedulePersistJobs() { }); } + public void restoreStatesAndScheduleForecastJobs() { + itemRegistry.getItems().forEach(this::addItem); + } + + public void addItem(Item item) { + if (persistenceService instanceof QueryablePersistenceService) { + if (UnDefType.NULL.equals(item.getState()) + && (getMatchingConfigurations(RESTORE) + .anyMatch(configuration -> appliesToItem(configuration, item))) + || getMatchingConfigurations(FORECAST) + .anyMatch(configuration -> appliesToItem(configuration, item))) { + restoreItemStateIfPossible(item); + } + if (getMatchingConfigurations(FORECAST).anyMatch(configuration -> appliesToItem(configuration, item))) { + scheduleNextPersistedForecastForItem(item.getName()); + + } + } + } + + public void removeItem(String itemName) { + ScheduledCompletableFuture job = forecastJobs.remove(itemName); + if (job != null) { + job.cancel(true); + } + } + + private void restoreItemStateIfPossible(Item item) { + QueryablePersistenceService queryService = (QueryablePersistenceService) persistenceService; + + FilterCriteria filter = new FilterCriteria().setItemName(item.getName()).setEndDate(ZonedDateTime.now()) + .setPageSize(1); + Iterable result = safeCaller.create(queryService, QueryablePersistenceService.class) + .onTimeout( + () -> logger.warn("Querying persistence service '{}' to restore '{}' takes more than {}ms.", + queryService.getId(), item.getName(), SafeCaller.DEFAULT_TIMEOUT)) + .onException(e -> logger.error( + "Exception occurred while querying persistence service '{}' to restore '{}': {}", + queryService.getId(), item.getName(), e.getMessage(), e)) + .build().query(filter); + if (result == null) { + // in case of an exception or timeout, the safe caller returns null + return; + } + Iterator it = result.iterator(); + if (it.hasNext()) { + HistoricItem historicItem = it.next(); + GenericItem genericItem = (GenericItem) item; + if (!UnDefType.NULL.equals(item.getState())) { + // someone else already restored the state or a new state was set + return; + } + genericItem.removeStateChangeListener(PersistenceManager.this); + genericItem.setState(historicItem.getState()); + genericItem.addStateChangeListener(PersistenceManager.this); + if (logger.isDebugEnabled()) { + logger.debug("Restored item state from '{}' for item '{}' -> '{}'", + DateTimeFormatter.ISO_ZONED_DATE_TIME.format(historicItem.getTimestamp()), item.getName(), + historicItem.getState()); + } + } + } + + public void scheduleNextForecastForItem(String itemName, Instant time, State state) { + ScheduledFuture oldJob = forecastJobs.remove(itemName); + if (oldJob != null) { + oldJob.cancel(true); + } + forecastJobs.put(itemName, scheduler.at(() -> restoreItemState(itemName, state), time)); + logger.trace("Scheduled forecasted value for {} at {}", itemName, time); + } + + public void scheduleNextPersistedForecastForItem(String itemName) { + Item item = itemRegistry.get(itemName); + if (item instanceof GenericItem) { + QueryablePersistenceService queryService = (QueryablePersistenceService) persistenceService; + FilterCriteria filter = new FilterCriteria().setItemName(itemName).setBeginDate(ZonedDateTime.now()) + .setOrdering(ASCENDING); + Iterator result = safeCaller.create(queryService, QueryablePersistenceService.class) + .onTimeout(() -> logger.warn("Querying persistence service '{}' takes more than {}ms.", + queryService.getId(), SafeCaller.DEFAULT_TIMEOUT)) + .onException(e -> logger.error("Exception occurred while querying persistence service '{}': {}", + queryService.getId(), e.getMessage(), e)) + .build().query(filter).iterator(); + while (result.hasNext()) { + HistoricItem next = result.next(); + if (next.getTimestamp().isAfter(ZonedDateTime.now())) { + scheduleNextForecastForItem(itemName, next.getTimestamp().toInstant(), next.getState()); + break; + } + } + } + } + + private void restoreItemState(String itemName, State state) { + Item item = itemRegistry.get(itemName); + if (item != null) { + ((GenericItem) item).setState(state); + } + scheduleNextPersistedForecastForItem(itemName); + } + private void persistJob(List itemConfigs) { itemConfigs.forEach(itemConfig -> { for (Item item : getAllItems(itemConfig)) { diff --git a/bundles/org.openhab.core.persistence/src/main/java/org/openhab/core/persistence/strategy/PersistenceStrategy.java b/bundles/org.openhab.core.persistence/src/main/java/org/openhab/core/persistence/strategy/PersistenceStrategy.java index 4cbf6ca659a..343615e8f8b 100644 --- a/bundles/org.openhab.core.persistence/src/main/java/org/openhab/core/persistence/strategy/PersistenceStrategy.java +++ b/bundles/org.openhab.core.persistence/src/main/java/org/openhab/core/persistence/strategy/PersistenceStrategy.java @@ -29,9 +29,12 @@ public static class Globals { public static final PersistenceStrategy UPDATE = new PersistenceStrategy("everyUpdate"); public static final PersistenceStrategy CHANGE = new PersistenceStrategy("everyChange"); public static final PersistenceStrategy RESTORE = new PersistenceStrategy("restoreOnStartup"); - - public static final Map STRATEGIES = Map.of(UPDATE.name, UPDATE, CHANGE.name, - CHANGE, RESTORE.name, RESTORE); + public static final PersistenceStrategy FORECAST = new PersistenceStrategy("forecast"); + public static final Map STRATEGIES = Map.of( // + UPDATE.name, UPDATE, // + CHANGE.name, CHANGE, // + RESTORE.name, RESTORE, // + FORECAST.name, FORECAST); } private final String name; diff --git a/bundles/org.openhab.core.persistence/src/test/java/org/openhab/core/persistence/internal/PersistenceManagerTest.java b/bundles/org.openhab.core.persistence/src/test/java/org/openhab/core/persistence/internal/PersistenceManagerTest.java index b33766fbd4a..59888597860 100644 --- a/bundles/org.openhab.core.persistence/src/test/java/org/openhab/core/persistence/internal/PersistenceManagerTest.java +++ b/bundles/org.openhab.core.persistence/src/test/java/org/openhab/core/persistence/internal/PersistenceManagerTest.java @@ -14,17 +14,20 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.clearInvocations; -import static org.mockito.Mockito.timeout; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; import java.math.BigDecimal; +import java.time.Instant; +import java.time.ZoneId; import java.time.ZonedDateTime; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Set; import org.eclipse.jdt.annotation.NonNullByDefault; import org.eclipse.jdt.annotation.Nullable; @@ -32,6 +35,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentCaptor; +import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; @@ -39,14 +43,18 @@ import org.openhab.core.common.SafeCaller; import org.openhab.core.common.SafeCallerBuilder; import org.openhab.core.items.GroupItem; +import org.openhab.core.items.Item; import org.openhab.core.items.ItemNotFoundException; import org.openhab.core.items.ItemRegistry; import org.openhab.core.library.items.NumberItem; import org.openhab.core.library.items.StringItem; import org.openhab.core.library.types.DecimalType; import org.openhab.core.library.types.StringType; +import org.openhab.core.persistence.FilterCriteria; import org.openhab.core.persistence.HistoricItem; +import org.openhab.core.persistence.ModifiablePersistenceService; import org.openhab.core.persistence.PersistenceItemConfiguration; +import org.openhab.core.persistence.PersistenceItemInfo; import org.openhab.core.persistence.PersistenceService; import org.openhab.core.persistence.QueryablePersistenceService; import org.openhab.core.persistence.config.PersistenceAllConfig; @@ -61,10 +69,12 @@ import org.openhab.core.persistence.strategy.PersistenceStrategy; import org.openhab.core.scheduler.CronScheduler; import org.openhab.core.scheduler.ScheduledCompletableFuture; +import org.openhab.core.scheduler.Scheduler; import org.openhab.core.scheduler.SchedulerRunnable; import org.openhab.core.service.ReadyMarker; import org.openhab.core.service.ReadyService; import org.openhab.core.types.State; +import org.openhab.core.types.TimeSeries; import org.openhab.core.types.UnDefType; /** @@ -108,7 +118,10 @@ public String getName() { private static final String TEST_PERSISTENCE_SERVICE_ID = "testPersistenceService"; private static final String TEST_QUERYABLE_PERSISTENCE_SERVICE_ID = "testQueryablePersistenceService"; + private static final String TEST_MODIFIABLE_PERSISTENCE_SERVICE_ID = "testModifiablePersistenceService"; + private @NonNullByDefault({}) @Mock CronScheduler cronSchedulerMock; + private @NonNullByDefault({}) @Mock Scheduler schedulerMock; private @NonNullByDefault({}) @Mock ScheduledCompletableFuture scheduledFutureMock; private @NonNullByDefault({}) @Mock ItemRegistry itemRegistryMock; private @NonNullByDefault({}) @Mock SafeCaller safeCallerMock; @@ -118,6 +131,7 @@ public String getName() { private @NonNullByDefault({}) @Mock PersistenceService persistenceServiceMock; private @NonNullByDefault({}) @Mock QueryablePersistenceService queryablePersistenceServiceMock; + private @NonNullByDefault({}) @Mock ModifiablePersistenceService modifiablePersistenceServiceMock; private @NonNullByDefault({}) PersistenceManager manager; @@ -139,13 +153,15 @@ public void setUp() throws ItemNotFoundException { when(persistenceServiceMock.getId()).thenReturn(TEST_PERSISTENCE_SERVICE_ID); when(queryablePersistenceServiceMock.getId()).thenReturn(TEST_QUERYABLE_PERSISTENCE_SERVICE_ID); when(queryablePersistenceServiceMock.query(any())).thenReturn(List.of(TEST_HISTORIC_ITEM)); + when(modifiablePersistenceServiceMock.getId()).thenReturn(TEST_MODIFIABLE_PERSISTENCE_SERVICE_ID); - manager = new PersistenceManager(cronSchedulerMock, itemRegistryMock, safeCallerMock, readyServiceMock, - persistenceServiceConfigurationRegistryMock); + manager = new PersistenceManager(cronSchedulerMock, schedulerMock, itemRegistryMock, safeCallerMock, + readyServiceMock, persistenceServiceConfigurationRegistryMock); manager.addPersistenceService(persistenceServiceMock); manager.addPersistenceService(queryablePersistenceServiceMock); + manager.addPersistenceService(modifiablePersistenceServiceMock); - clearInvocations(persistenceServiceMock, queryablePersistenceServiceMock); + clearInvocations(persistenceServiceMock, queryablePersistenceServiceMock, modifiablePersistenceServiceMock); } @Test @@ -299,6 +315,82 @@ public void noRestoreOnStartupWhenItemNotNull() { verifyNoMoreInteractions(persistenceServiceMock); } + @Test + public void storeTimeSeriesAndForecastsScheduled() { + List> futures = new ArrayList<>(); + TestModifiablePersistenceService service = spy(new TestModifiablePersistenceService()); + manager.addPersistenceService(service); + + when(schedulerMock.at(any(SchedulerRunnable.class), any(Instant.class))).thenAnswer(i -> { + ScheduledCompletableFuture future = mock(ScheduledCompletableFuture.class); + when(future.getScheduledTime()).thenReturn(((Instant) i.getArgument(1)).atZone(ZoneId.systemDefault())); + futures.add(future); + return future; + }); + + addConfiguration(TestModifiablePersistenceService.ID, new PersistenceAllConfig(), + PersistenceStrategy.Globals.FORECAST, null); + + Instant time1 = Instant.now().minusSeconds(1000); + Instant time2 = Instant.now().plusSeconds(1000); + Instant time3 = Instant.now().plusSeconds(2000); + Instant time4 = Instant.now().plusSeconds(3000); + + // add elements + TimeSeries timeSeries = new TimeSeries(TimeSeries.Policy.ADD); + timeSeries.add(time1, new StringType("one")); + timeSeries.add(time2, new StringType("two")); + timeSeries.add(time3, new StringType("three")); + timeSeries.add(time4, new StringType("four")); + + manager.timeSeriesUpdated(TEST_ITEM, timeSeries); + InOrder inOrder = inOrder(service, schedulerMock); + + // verify elements are stored + timeSeries.getStates().forEach(entry -> inOrder.verify(service).store(any(Item.class), + eq(entry.timestamp().atZone(ZoneId.systemDefault())), eq(entry.state()))); + + // first element not scheduled, because it is in the past, check if second is scheduled + inOrder.verify(schedulerMock).at(any(SchedulerRunnable.class), eq(time2)); + inOrder.verifyNoMoreInteractions(); + + // replace elements + TimeSeries timeSeries2 = new TimeSeries(TimeSeries.Policy.REPLACE); + timeSeries2.add(time3, new StringType("three2")); + timeSeries2.add(time4, new StringType("four2")); + + manager.timeSeriesUpdated(TEST_ITEM, timeSeries2); + + // verify removal of old elements from service + ArgumentCaptor filterCaptor = ArgumentCaptor.forClass(FilterCriteria.class); + inOrder.verify(service).remove(filterCaptor.capture()); + FilterCriteria filterCriteria = filterCaptor.getValue(); + assertThat(filterCriteria.getItemName(), is(TEST_ITEM_NAME)); + assertThat(filterCriteria.getBeginDate(), is(time3.atZone(ZoneId.systemDefault()))); + assertThat(filterCriteria.getEndDate(), is(time4.atZone(ZoneId.systemDefault()))); + + // verify restore future is not cancelled + verify(futures.get(0), never()).cancel(anyBoolean()); + + // verify new values are stored + inOrder.verify(service, times(2)).store(any(Item.class), any(ZonedDateTime.class), any(State.class)); + inOrder.verifyNoMoreInteractions(); + + // try adding a new element in front and check it is correctly scheduled + Instant time5 = Instant.now().plusSeconds(500); + // add elements + TimeSeries timeSeries3 = new TimeSeries(TimeSeries.Policy.ADD); + timeSeries3.add(time5, new StringType("five")); + + manager.timeSeriesUpdated(TEST_ITEM, timeSeries3); + // verify old restore future is cancelled + inOrder.verify(service, times(1)).store(any(Item.class), any(ZonedDateTime.class), any(State.class)); + verify(futures.get(0)).cancel(true); + + // verify new restore future is properly created + inOrder.verify(schedulerMock).at(any(SchedulerRunnable.class), eq(time5)); + } + @Test public void cronStrategyIsScheduledAndCancelledAndPersistsValue() throws Exception { ArgumentCaptor runnableCaptor = ArgumentCaptor.forClass(SchedulerRunnable.class); @@ -402,4 +494,85 @@ private PersistenceServiceConfiguration addConfiguration(String serviceId, Persi return serviceConfiguration; } + + private static class TestModifiablePersistenceService implements ModifiablePersistenceService { + public static final String ID = "TMPS"; + private final Map states = new HashMap<>(); + + @Override + public void store(Item item, ZonedDateTime date, State state) { + states.put(date, state); + } + + @Override + public void store(Item item, ZonedDateTime date, State state, @Nullable String alias) { + store(item, date, state); + } + + @Override + public boolean remove(FilterCriteria filter) throws IllegalArgumentException { + ZonedDateTime begin = Objects.requireNonNull(filter.getBeginDate()); + ZonedDateTime end = Objects.requireNonNull(filter.getEndDate()); + List keys = states.keySet().stream().filter(t -> t.isAfter(begin) && t.isBefore(end)) + .toList(); + keys.forEach(states::remove); + return !keys.isEmpty(); + } + + @Override + public String getId() { + return ID; + } + + @Override + public String getLabel(@Nullable Locale locale) { + return ID; + } + + @Override + public void store(Item item) { + throw new UnsupportedOperationException(); + } + + @Override + public void store(Item item, @Nullable String alias) { + throw new UnsupportedOperationException(); + } + + @Override + public List getDefaultStrategies() { + return List.of(); + } + + @Override + @SuppressWarnings("unchecked") + public Iterable query(FilterCriteria filter) { + ZonedDateTime begin = Objects.requireNonNull(filter.getBeginDate()); + ZonedDateTime end = Objects.requireNonNull(filter.getEndDate()); + List keys = states.keySet().stream().filter(t -> t.isAfter(begin) && t.isBefore(end)) + .toList(); + return (Iterable) states.entrySet().stream().filter(e -> keys.contains(e.getKey())) + .map(e -> new HistoricItem() { + @Override + public ZonedDateTime getTimestamp() { + return e.getKey(); + } + + @Override + public State getState() { + return e.getValue(); + } + + @Override + public String getName() { + return "item"; + } + }).iterator(); + } + + @Override + public Set getItemInfo() { + return Set.of(); + } + } } diff --git a/bundles/org.openhab.core.test.magic/src/main/java/org/openhab/core/magic/binding/MagicBindingConstants.java b/bundles/org.openhab.core.test.magic/src/main/java/org/openhab/core/magic/binding/MagicBindingConstants.java index 166f29c2f93..47d1981b099 100644 --- a/bundles/org.openhab.core.test.magic/src/main/java/org/openhab/core/magic/binding/MagicBindingConstants.java +++ b/bundles/org.openhab.core.test.magic/src/main/java/org/openhab/core/magic/binding/MagicBindingConstants.java @@ -48,6 +48,7 @@ public class MagicBindingConstants { public static final ThingTypeUID THING_TYPE_DYNAMIC_STATE_DESCRIPTION = new ThingTypeUID(BINDING_ID, "dynamic-state-description"); public static final ThingTypeUID THING_TYPE_ONLINE_OFFLINE = new ThingTypeUID(BINDING_ID, "online-offline"); + public static final ThingTypeUID THING_TYPE_TIMESERIES = new ThingTypeUID(BINDING_ID, "timeseries"); // bridged things public static final ThingTypeUID THING_TYPE_BRIDGE_1 = new ThingTypeUID(BINDING_ID, "magic-bridge1"); @@ -67,7 +68,7 @@ public class MagicBindingConstants { public static final String CHANNEL_BATTERY_LEVEL = "battery-level"; public static final String CHANNEL_SYSTEM_COMMAND = "systemcommand"; public static final String CHANNEL_SIGNAL_STRENGTH = "signal-strength"; - + public static final String CHANNEL_FORECAST = "forecast"; // Firmware update needed models public static final String UPDATE_MODEL_PROPERTY = "updateModel"; diff --git a/bundles/org.openhab.core.test.magic/src/main/java/org/openhab/core/magic/binding/handler/MagicTimeSeriesHandler.java b/bundles/org.openhab.core.test.magic/src/main/java/org/openhab/core/magic/binding/handler/MagicTimeSeriesHandler.java new file mode 100644 index 00000000000..a596a7719e2 --- /dev/null +++ b/bundles/org.openhab.core.test.magic/src/main/java/org/openhab/core/magic/binding/handler/MagicTimeSeriesHandler.java @@ -0,0 +1,111 @@ +/** + * Copyright (c) 2010-2023 Contributors to the openHAB project + * + * See the NOTICE file(s) distributed with this work for additional + * information. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.openhab.core.magic.binding.handler; + +import static org.openhab.core.magic.binding.MagicBindingConstants.CHANNEL_FORECAST; +import static org.openhab.core.types.TimeSeries.Policy.ADD; + +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jdt.annotation.NonNullByDefault; +import org.eclipse.jdt.annotation.Nullable; +import org.openhab.core.library.types.DecimalType; +import org.openhab.core.thing.ChannelUID; +import org.openhab.core.thing.Thing; +import org.openhab.core.thing.ThingStatus; +import org.openhab.core.thing.binding.BaseThingHandler; +import org.openhab.core.types.Command; +import org.openhab.core.types.TimeSeries; + +/** + * The {@link MagicTimeSeriesHandler} is capable of providing a series of different forecasts + * + * @author Jan N. Klug - Initial contribution + */ +@NonNullByDefault +public class MagicTimeSeriesHandler extends BaseThingHandler { + + private @Nullable ScheduledFuture scheduledJob; + private Configuration configuration = new Configuration(); + + public MagicTimeSeriesHandler(Thing thing) { + super(thing); + } + + @Override + public void handleCommand(ChannelUID channelUID, Command command) { + // no-op + } + + @Override + public void initialize() { + configuration = getConfigAs(Configuration.class); + startScheduledJob(); + + updateStatus(ThingStatus.ONLINE); + } + + @Override + public void dispose() { + stopScheduledJob(); + } + + private void startScheduledJob() { + ScheduledFuture localScheduledJob = scheduledJob; + if (localScheduledJob == null || localScheduledJob.isCancelled()) { + scheduledJob = scheduler.scheduleWithFixedDelay(() -> { + Instant now = Instant.now(); + TimeSeries timeSeries = new TimeSeries(ADD); + Duration stepSize = Duration.ofSeconds(configuration.interval / configuration.count); + double range = configuration.max - configuration.min; + for (int i = 1; i <= configuration.count; i++) { + double value = switch (configuration.type) { + case RND -> Math.random() * range + configuration.min; + case ASC -> (range / configuration.count) * i + configuration.min; + case DESC -> configuration.max + (range / configuration.count) * i; + }; + timeSeries.add(now.plus(stepSize.multipliedBy(i)), new DecimalType(value)); + } + sendTimeSeries(CHANNEL_FORECAST, timeSeries); + }, 0, configuration.interval, TimeUnit.SECONDS); + } + } + + private void stopScheduledJob() { + ScheduledFuture localScheduledJob = scheduledJob; + if (localScheduledJob != null && !localScheduledJob.isCancelled()) { + localScheduledJob.cancel(true); + scheduledJob = null; + } + } + + public static class Configuration { + public int interval = 600; + public Type type = Type.RND; + public double min = 0.0; + public double max = 100.0; + public int count = 10; + + public Configuration() { + } + } + + public enum Type { + RND, + ASC, + DESC + } +} diff --git a/bundles/org.openhab.core.test.magic/src/main/java/org/openhab/core/magic/binding/internal/MagicHandlerFactory.java b/bundles/org.openhab.core.test.magic/src/main/java/org/openhab/core/magic/binding/internal/MagicHandlerFactory.java index 5cfe5b7e286..b2ae4f9464d 100644 --- a/bundles/org.openhab.core.test.magic/src/main/java/org/openhab/core/magic/binding/internal/MagicHandlerFactory.java +++ b/bundles/org.openhab.core.test.magic/src/main/java/org/openhab/core/magic/binding/internal/MagicHandlerFactory.java @@ -38,6 +38,7 @@ import org.openhab.core.magic.binding.handler.MagicPlayerHandler; import org.openhab.core.magic.binding.handler.MagicRollershutterHandler; import org.openhab.core.magic.binding.handler.MagicThermostatThingHandler; +import org.openhab.core.magic.binding.handler.MagicTimeSeriesHandler; import org.openhab.core.thing.Bridge; import org.openhab.core.thing.Thing; import org.openhab.core.thing.ThingTypeUID; @@ -62,8 +63,8 @@ public class MagicHandlerFactory extends BaseThingHandlerFactory { THING_TYPE_CONTACT_SENSOR, THING_TYPE_CONFIG_THING, THING_TYPE_DELAYED_THING, THING_TYPE_LOCATION, THING_TYPE_THERMOSTAT, THING_TYPE_FIRMWARE_UPDATE, THING_TYPE_BRIDGE_1, THING_TYPE_BRIDGE_2, THING_TYPE_BRIDGED_THING, THING_TYPE_CHATTY_THING, THING_TYPE_ROLLERSHUTTER, THING_TYPE_PLAYER, - THING_TYPE_IMAGE, THING_TYPE_ACTION_MODULE, THING_TYPE_DYNAMIC_STATE_DESCRIPTION, - THING_TYPE_ONLINE_OFFLINE); + THING_TYPE_IMAGE, THING_TYPE_ACTION_MODULE, THING_TYPE_DYNAMIC_STATE_DESCRIPTION, THING_TYPE_ONLINE_OFFLINE, + THING_TYPE_TIMESERIES); private final MagicDynamicCommandDescriptionProvider commandDescriptionProvider; private final MagicDynamicStateDescriptionProvider stateDescriptionProvider; @@ -125,6 +126,8 @@ public boolean supportsThingType(ThingTypeUID thingTypeUID) { return new MagicOnlineOfflineHandler(thing); } else if (THING_TYPE_BRIDGE_1.equals(thingTypeUID) || THING_TYPE_BRIDGE_2.equals(thingTypeUID)) { return new MagicBridgeHandler((Bridge) thing); + } else if (THING_TYPE_TIMESERIES.equals(thingTypeUID)) { + return new MagicTimeSeriesHandler(thing); } return null; diff --git a/bundles/org.openhab.core.test.magic/src/main/resources/OH-INF/thing/channel-types.xml b/bundles/org.openhab.core.test.magic/src/main/resources/OH-INF/thing/channel-types.xml index 0fb616e7b89..8615a04f435 100644 --- a/bundles/org.openhab.core.test.magic/src/main/resources/OH-INF/thing/channel-types.xml +++ b/bundles/org.openhab.core.test.magic/src/main/resources/OH-INF/thing/channel-types.xml @@ -160,4 +160,8 @@ time + + Number + + diff --git a/bundles/org.openhab.core.test.magic/src/main/resources/OH-INF/thing/thing-types.xml b/bundles/org.openhab.core.test.magic/src/main/resources/OH-INF/thing/thing-types.xml index f29bf39267b..ff8c88af0ff 100644 --- a/bundles/org.openhab.core.test.magic/src/main/resources/OH-INF/thing/thing-types.xml +++ b/bundles/org.openhab.core.test.magic/src/main/resources/OH-INF/thing/thing-types.xml @@ -234,4 +234,44 @@ + + + + Demonstrates the use of TimeSeries as forecast. + + + + + + + The interval to send the generated data. + 600 + + + + How to generate the values. + + + + + + RND + + + + The minimum value. + 0 + + + + The maximum value. + 100 + + + + The number of values to generate. + 10 + + + diff --git a/bundles/org.openhab.core.thing/src/main/java/org/openhab/core/thing/binding/BaseThingHandler.java b/bundles/org.openhab.core.thing/src/main/java/org/openhab/core/thing/binding/BaseThingHandler.java index 91efce1e6f4..e5fa558b3bd 100644 --- a/bundles/org.openhab.core.thing/src/main/java/org/openhab/core/thing/binding/BaseThingHandler.java +++ b/bundles/org.openhab.core.thing/src/main/java/org/openhab/core/thing/binding/BaseThingHandler.java @@ -41,6 +41,7 @@ import org.openhab.core.types.Command; import org.openhab.core.types.RefreshType; import org.openhab.core.types.State; +import org.openhab.core.types.TimeSeries; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,6 +61,7 @@ * @author Stefan Bußweiler - Added new thing status handling, refactorings thing/bridge life cycle * @author Kai Kreuzer - Refactored isLinked method to not use deprecated functions anymore * @author Christoph Weitkamp - Moved OSGI ServiceTracker from BaseThingHandler to ThingHandlerCallback + * @author Jan N. Klug - added time series support */ @NonNullByDefault public abstract class BaseThingHandler implements ThingHandler { @@ -287,6 +289,36 @@ protected void updateState(String channelID, State state) { updateState(channelUID, state); } + /** + * Send a time series to the channel. This can be used to transfer historic data or forecasts. + * + * @param channelUID unique id of the channel + * @param timeSeries the {@link TimeSeries} that is sent + */ + protected void sendTimeSeries(ChannelUID channelUID, TimeSeries timeSeries) { + synchronized (this) { + ThingHandlerCallback callback1 = this.callback; + if (callback1 != null) { + callback1.sendTimeSeries(channelUID, timeSeries); + } else { + logger.warn( + "Handler {} of thing {} tried sending to channel {} although the handler was already disposed.", + this.getClass().getSimpleName(), channelUID.getThingUID(), channelUID.getId()); + } + } + } + + /** + * Send a time series to the channel. This can be used to transfer historic data or forecasts. + * + * @param channelID id of the channel + * @param timeSeries the {@link TimeSeries} that is sent + */ + protected void sendTimeSeries(String channelID, TimeSeries timeSeries) { + ChannelUID channelUID = new ChannelUID(this.getThing().getUID(), channelID); + sendTimeSeries(channelUID, timeSeries); + } + /** * Emits an event for the given channel. * diff --git a/bundles/org.openhab.core.thing/src/main/java/org/openhab/core/thing/binding/ThingHandlerCallback.java b/bundles/org.openhab.core.thing/src/main/java/org/openhab/core/thing/binding/ThingHandlerCallback.java index 0f99a65435e..33763184d25 100644 --- a/bundles/org.openhab.core.thing/src/main/java/org/openhab/core/thing/binding/ThingHandlerCallback.java +++ b/bundles/org.openhab.core.thing/src/main/java/org/openhab/core/thing/binding/ThingHandlerCallback.java @@ -35,6 +35,7 @@ import org.openhab.core.thing.type.ChannelTypeUID; import org.openhab.core.types.Command; import org.openhab.core.types.State; +import org.openhab.core.types.TimeSeries; /** * {@link ThingHandlerCallback} is callback interface for {@link ThingHandler}s. The implementation of a @@ -65,6 +66,14 @@ public interface ThingHandlerCallback { */ void postCommand(ChannelUID channelUID, Command command); + /** + * Informs about a time series, whcihs is send from the channel. + * + * @param channelUID channel UID + * @param timeSeries time series + */ + void sendTimeSeries(ChannelUID channelUID, TimeSeries timeSeries); + /** * Informs about an updated status of a thing. * diff --git a/bundles/org.openhab.core.thing/src/main/java/org/openhab/core/thing/internal/CommunicationManager.java b/bundles/org.openhab.core.thing/src/main/java/org/openhab/core/thing/internal/CommunicationManager.java index ab6d5f9dc15..10fb9d414a0 100644 --- a/bundles/org.openhab.core.thing/src/main/java/org/openhab/core/thing/internal/CommunicationManager.java +++ b/bundles/org.openhab.core.thing/src/main/java/org/openhab/core/thing/internal/CommunicationManager.java @@ -71,9 +71,11 @@ import org.openhab.core.thing.profiles.ProfileFactory; import org.openhab.core.thing.profiles.ProfileTypeUID; import org.openhab.core.thing.profiles.StateProfile; +import org.openhab.core.thing.profiles.TimeSeriesProfile; import org.openhab.core.thing.profiles.TriggerProfile; import org.openhab.core.types.Command; import org.openhab.core.types.State; +import org.openhab.core.types.TimeSeries; import org.openhab.core.types.Type; import org.osgi.service.component.annotations.Activate; import org.osgi.service.component.annotations.Component; @@ -90,6 +92,7 @@ * It mainly mediates commands, state updates and triggers from ThingHandlers to the framework and vice versa. * * @author Simon Kaufmann - Initial contribution factored out of ThingManger + * @author Jan N. Klug - Added time series support */ @NonNullByDefault @Component(service = { EventSubscriber.class, CommunicationManager.class }, immediate = true) @@ -520,6 +523,20 @@ public void postCommand(ChannelUID channelUID, Command command) { }); } + public void sendTimeSeries(ChannelUID channelUID, TimeSeries timeSeries) { + ThingUID thingUID = channelUID.getThingUID(); + Thing thing = thingRegistry.get(thingUID); + handleCallFromHandler(channelUID, thing, profile -> { + // TODO: check which profiles need enhancements + if (profile instanceof TimeSeriesProfile timeSeriesProfile) { + timeSeriesProfile.onTimeSeriesFromHandler(timeSeries); + } else { + logger.warn("Profile '{}' on channel {} does not support time series.", profile.getProfileTypeUID(), + channelUID); + } + }); + } + private void handleCallFromHandler(ChannelUID channelUID, @Nullable Thing thing, Consumer action) { itemChannelLinkRegistry.getLinks(channelUID).forEach(link -> { final Item item = getItem(link.getItemName()); diff --git a/bundles/org.openhab.core.thing/src/main/java/org/openhab/core/thing/internal/ThingHandlerCallbackImpl.java b/bundles/org.openhab.core.thing/src/main/java/org/openhab/core/thing/internal/ThingHandlerCallbackImpl.java index 1ffab2b1a34..184b652dc01 100644 --- a/bundles/org.openhab.core.thing/src/main/java/org/openhab/core/thing/internal/ThingHandlerCallbackImpl.java +++ b/bundles/org.openhab.core.thing/src/main/java/org/openhab/core/thing/internal/ThingHandlerCallbackImpl.java @@ -42,6 +42,7 @@ import org.openhab.core.thing.util.ThingHandlerHelper; import org.openhab.core.types.Command; import org.openhab.core.types.State; +import org.openhab.core.types.TimeSeries; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,6 +71,11 @@ public void postCommand(ChannelUID channelUID, Command command) { thingManager.communicationManager.postCommand(channelUID, command); } + @Override + public void sendTimeSeries(ChannelUID channelUID, TimeSeries timeSeries) { + thingManager.communicationManager.sendTimeSeries(channelUID, timeSeries); + } + @Override public void channelTriggered(Thing thing, ChannelUID channelUID, String event) { thingManager.communicationManager.channelTriggered(thing, channelUID, event); diff --git a/bundles/org.openhab.core.thing/src/main/java/org/openhab/core/thing/internal/profiles/ProfileCallbackImpl.java b/bundles/org.openhab.core.thing/src/main/java/org/openhab/core/thing/internal/profiles/ProfileCallbackImpl.java index a0f40e53059..6f30e255c72 100644 --- a/bundles/org.openhab.core.thing/src/main/java/org/openhab/core/thing/internal/profiles/ProfileCallbackImpl.java +++ b/bundles/org.openhab.core.thing/src/main/java/org/openhab/core/thing/internal/profiles/ProfileCallbackImpl.java @@ -33,6 +33,7 @@ import org.openhab.core.thing.util.ThingHandlerHelper; import org.openhab.core.types.Command; import org.openhab.core.types.State; +import org.openhab.core.types.TimeSeries; import org.openhab.core.types.TypeParser; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -145,6 +146,19 @@ public void sendUpdate(State state) { ItemEventFactory.createStateEvent(link.getItemName(), acceptedState, link.getLinkedUID().toString())); } + @Override + public void sendTimeSeries(TimeSeries timeSeries) { + Item item = itemProvider.apply(link.getItemName()); + if (item == null) { + logger.warn("Cannot send time series event '{}' for item '{}', because no item could be found.", timeSeries, + link.getItemName()); + return; + } + + eventPublisher.post( + ItemEventFactory.createTimeSeriesEvent(link.getItemName(), timeSeries, link.getLinkedUID().toString())); + } + @FunctionalInterface public interface AcceptedTypeConverter { @Nullable diff --git a/bundles/org.openhab.core.thing/src/main/java/org/openhab/core/thing/internal/profiles/SystemDefaultProfile.java b/bundles/org.openhab.core.thing/src/main/java/org/openhab/core/thing/internal/profiles/SystemDefaultProfile.java index 1b5b7881aee..d54f02bc3ec 100644 --- a/bundles/org.openhab.core.thing/src/main/java/org/openhab/core/thing/internal/profiles/SystemDefaultProfile.java +++ b/bundles/org.openhab.core.thing/src/main/java/org/openhab/core/thing/internal/profiles/SystemDefaultProfile.java @@ -16,10 +16,11 @@ import org.openhab.core.thing.binding.ThingHandler; import org.openhab.core.thing.profiles.ProfileCallback; import org.openhab.core.thing.profiles.ProfileTypeUID; -import org.openhab.core.thing.profiles.StateProfile; import org.openhab.core.thing.profiles.SystemProfiles; +import org.openhab.core.thing.profiles.TimeSeriesProfile; import org.openhab.core.types.Command; import org.openhab.core.types.State; +import org.openhab.core.types.TimeSeries; /** * This is the default profile for stateful channels. @@ -30,7 +31,7 @@ * @author Simon Kaufmann - Initial contribution */ @NonNullByDefault -public class SystemDefaultProfile implements StateProfile { +public class SystemDefaultProfile implements TimeSeriesProfile { private final ProfileCallback callback; @@ -58,6 +59,11 @@ public void onCommandFromHandler(Command command) { callback.sendCommand(command); } + @Override + public void onTimeSeriesFromHandler(TimeSeries timeSeries) { + callback.sendTimeSeries(timeSeries); + } + @Override public void onStateUpdateFromItem(State state) { } diff --git a/bundles/org.openhab.core.thing/src/main/java/org/openhab/core/thing/profiles/ProfileCallback.java b/bundles/org.openhab.core.thing/src/main/java/org/openhab/core/thing/profiles/ProfileCallback.java index 7c2483c8cb3..2e0eeecd993 100644 --- a/bundles/org.openhab.core.thing/src/main/java/org/openhab/core/thing/profiles/ProfileCallback.java +++ b/bundles/org.openhab.core.thing/src/main/java/org/openhab/core/thing/profiles/ProfileCallback.java @@ -16,6 +16,7 @@ import org.openhab.core.thing.link.ItemChannelLink; import org.openhab.core.types.Command; import org.openhab.core.types.State; +import org.openhab.core.types.TimeSeries; /** * Gives access to the framework features for continuing the communication flow. @@ -52,4 +53,11 @@ public interface ProfileCallback { * @param state */ void sendUpdate(State state); + + /** + * Send a {@link TimeSeries} update to the framework. + * + * @param timeSeries + */ + void sendTimeSeries(TimeSeries timeSeries); } diff --git a/bundles/org.openhab.core.thing/src/main/java/org/openhab/core/thing/profiles/TimeSeriesProfile.java b/bundles/org.openhab.core.thing/src/main/java/org/openhab/core/thing/profiles/TimeSeriesProfile.java new file mode 100644 index 00000000000..9625069bcce --- /dev/null +++ b/bundles/org.openhab.core.thing/src/main/java/org/openhab/core/thing/profiles/TimeSeriesProfile.java @@ -0,0 +1,32 @@ +/** + * Copyright (c) 2010-2023 Contributors to the openHAB project + * + * See the NOTICE file(s) distributed with this work for additional + * information. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.openhab.core.thing.profiles; + +import org.eclipse.jdt.annotation.NonNullByDefault; +import org.openhab.core.types.TimeSeries; + +/** + * The {@link TimeSeriesProfile} extends the {@link StateProfile} to support {@link TimeSeries} updates + * + * @author Jan N. Klug - Initial contribution + */ +@NonNullByDefault +public interface TimeSeriesProfile extends StateProfile { + + /** + * If a binding sends a time-series to a channel, this method will be called for each linked item. + * + * @param timeSeries the time-series + */ + void onTimeSeriesFromHandler(TimeSeries timeSeries); +} diff --git a/bundles/org.openhab.core.thing/src/test/java/org/openhab/core/thing/CommunicationManagerConversionTest.java b/bundles/org.openhab.core.thing/src/test/java/org/openhab/core/thing/CommunicationManagerConversionTest.java deleted file mode 100644 index 432a4398321..00000000000 --- a/bundles/org.openhab.core.thing/src/test/java/org/openhab/core/thing/CommunicationManagerConversionTest.java +++ /dev/null @@ -1,141 +0,0 @@ -/** - * Copyright (c) 2010-2023 Contributors to the openHAB project - * - * See the NOTICE file(s) distributed with this work for additional - * information. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License 2.0 which is available at - * http://www.eclipse.org/legal/epl-2.0 - * - * SPDX-License-Identifier: EPL-2.0 - */ -package org.openhab.core.thing; - -import static org.junit.jupiter.api.Assertions.*; - -import java.lang.reflect.InvocationTargetException; -import java.util.ArrayList; -import java.util.List; -import java.util.stream.Stream; - -import org.eclipse.jdt.annotation.NonNullByDefault; -import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; -import org.openhab.core.items.Item; -import org.openhab.core.library.items.CallItem; -import org.openhab.core.library.items.ColorItem; -import org.openhab.core.library.items.ContactItem; -import org.openhab.core.library.items.DateTimeItem; -import org.openhab.core.library.items.DimmerItem; -import org.openhab.core.library.items.ImageItem; -import org.openhab.core.library.items.LocationItem; -import org.openhab.core.library.items.PlayerItem; -import org.openhab.core.library.items.RollershutterItem; -import org.openhab.core.library.items.StringItem; -import org.openhab.core.library.types.DateTimeType; -import org.openhab.core.library.types.DecimalType; -import org.openhab.core.library.types.HSBType; -import org.openhab.core.library.types.IncreaseDecreaseType; -import org.openhab.core.library.types.NextPreviousType; -import org.openhab.core.library.types.OnOffType; -import org.openhab.core.library.types.OpenClosedType; -import org.openhab.core.library.types.PercentType; -import org.openhab.core.library.types.PlayPauseType; -import org.openhab.core.library.types.PointType; -import org.openhab.core.library.types.QuantityType; -import org.openhab.core.library.types.RawType; -import org.openhab.core.library.types.RewindFastforwardType; -import org.openhab.core.library.types.StringType; -import org.openhab.core.library.types.UpDownType; -import org.openhab.core.types.Command; -import org.openhab.core.types.State; -import org.openhab.core.types.Type; -import org.openhab.core.types.UnDefType; - -/** - * @author Jan N. Klug - Initial contribution - */ -@NonNullByDefault -public class CommunicationManagerConversionTest { - // TODO: remove test - only to show CommunicationManager is too complex - - private static final List> ITEM_TYPES = List.of(CallItem.class, ColorItem.class, - ContactItem.class, DateTimeItem.class, DimmerItem.class, ImageItem.class, LocationItem.class, - PlayerItem.class, RollershutterItem.class, StringItem.class); - - private static final List> TYPES = List.of(DateTimeType.class, DecimalType.class, - HSBType.class, IncreaseDecreaseType.class, NextPreviousType.class, OnOffType.class, OpenClosedType.class, - PercentType.class, PlayPauseType.class, PointType.class, QuantityType.class, RawType.class, - RewindFastforwardType.class, StringType.class, UpDownType.class, UnDefType.class); - - private static Stream arguments() - throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException { - List arguments = new ArrayList<>(); - for (Class itemType : ITEM_TYPES) { - Item item = itemType.getDeclaredConstructor(String.class).newInstance("testItem"); - for (Class type : TYPES) { - if (type.isEnum()) { - arguments.add(Arguments.of(item, type.getEnumConstants()[0])); - } else if (type == RawType.class) { - arguments.add(Arguments.of(item, new RawType(new byte[] {}, "mimeType"))); - } else { - arguments.add(Arguments.of(item, type.getDeclaredConstructor().newInstance())); - } - } - } - return arguments.stream(); - } - - @Disabled - @MethodSource("arguments") - @ParameterizedTest - public void testCommand(Item item, Type originalType) { - Type returnType = null; - - List> acceptedTypes = item.getAcceptedCommandTypes(); - if (acceptedTypes.contains(originalType.getClass())) { - returnType = originalType; - } else { - // Look for class hierarchy and convert appropriately - for (Class typeClass : acceptedTypes) { - if (!typeClass.isEnum() && typeClass.isAssignableFrom(originalType.getClass()) // - && State.class.isAssignableFrom(typeClass) && originalType instanceof State state) { - returnType = state.as((Class) typeClass); - } - } - } - - if (returnType != null && !returnType.getClass().equals(originalType.getClass())) { - fail("CommunicationManager did a conversion for target item " + item.getType() + " from " - + originalType.getClass() + " to " + returnType.getClass()); - } - } - - @MethodSource("arguments") - @ParameterizedTest - public void testState(Item item, Type originalType) { - Type returnType = null; - - List> acceptedTypes = item.getAcceptedDataTypes(); - if (acceptedTypes.contains(originalType.getClass())) { - returnType = originalType; - } else { - // Look for class hierarchy and convert appropriately - for (Class typeClass : acceptedTypes) { - if (!typeClass.isEnum() && typeClass.isAssignableFrom(originalType.getClass()) // - && State.class.isAssignableFrom(typeClass) && originalType instanceof State state) { - returnType = state.as((Class) typeClass); - - } - } - } - - if (returnType != null && !returnType.equals(originalType)) { - fail("CommunicationManager did a conversion for target item " + item.getType() + " from " - + originalType.getClass() + " to " + returnType.getClass()); - } - } -} diff --git a/bundles/org.openhab.core.thing/src/test/java/org/openhab/core/thing/internal/profiles/SystemDefaultProfileTest.java b/bundles/org.openhab.core.thing/src/test/java/org/openhab/core/thing/internal/profiles/SystemDefaultProfileTest.java index 94952d03acf..720f0008515 100644 --- a/bundles/org.openhab.core.thing/src/test/java/org/openhab/core/thing/internal/profiles/SystemDefaultProfileTest.java +++ b/bundles/org.openhab.core.thing/src/test/java/org/openhab/core/thing/internal/profiles/SystemDefaultProfileTest.java @@ -22,6 +22,7 @@ import org.mockito.junit.jupiter.MockitoExtension; import org.openhab.core.library.types.OnOffType; import org.openhab.core.thing.profiles.ProfileCallback; +import org.openhab.core.types.TimeSeries; /** * @@ -60,4 +61,15 @@ public void testPostCommand() { verify(callbackMock).sendCommand(eq(OnOffType.ON)); verifyNoMoreInteractions(callbackMock); } + + @Test + public void testSendTimeSeries() { + TimeSeries timeSeries = new TimeSeries(TimeSeries.Policy.ADD); + + SystemDefaultProfile profile = new SystemDefaultProfile(callbackMock); + profile.onTimeSeriesFromHandler(timeSeries); + + verify(callbackMock).sendTimeSeries(timeSeries); + verifyNoMoreInteractions(callbackMock); + } } diff --git a/bundles/org.openhab.core/src/main/java/org/openhab/core/internal/items/ItemUpdater.java b/bundles/org.openhab.core/src/main/java/org/openhab/core/internal/items/ItemUpdater.java index 3b5db162036..6d41ed7962a 100644 --- a/bundles/org.openhab.core/src/main/java/org/openhab/core/internal/items/ItemUpdater.java +++ b/bundles/org.openhab.core/src/main/java/org/openhab/core/internal/items/ItemUpdater.java @@ -22,6 +22,7 @@ import org.openhab.core.items.events.AbstractItemEventSubscriber; import org.openhab.core.items.events.ItemCommandEvent; import org.openhab.core.items.events.ItemStateEvent; +import org.openhab.core.items.events.ItemTimeSeriesEvent; import org.openhab.core.types.State; import org.osgi.service.component.annotations.Activate; import org.osgi.service.component.annotations.Component; @@ -96,4 +97,16 @@ protected void receiveCommand(ItemCommandEvent commandEvent) { logger.debug("Received command for non-existing item: {}", e.getMessage()); } } + + @Override + protected void receiveTimeSeries(ItemTimeSeriesEvent timeSeriesEvent) { + try { + Item item = itemRegistry.getItem(timeSeriesEvent.getItemName()); + if (!(item instanceof GroupItem) && item instanceof GenericItem genericItem) { + genericItem.setTimeSeries(timeSeriesEvent.getTimeSeries()); + } + } catch (ItemNotFoundException e) { + logger.debug("Received command for non-existing item: {}", e.getMessage()); + } + } } diff --git a/bundles/org.openhab.core/src/main/java/org/openhab/core/items/GenericItem.java b/bundles/org.openhab.core/src/main/java/org/openhab/core/items/GenericItem.java index 1582f93a449..e6f4d062639 100644 --- a/bundles/org.openhab.core/src/main/java/org/openhab/core/items/GenericItem.java +++ b/bundles/org.openhab.core/src/main/java/org/openhab/core/items/GenericItem.java @@ -39,6 +39,7 @@ import org.openhab.core.types.RefreshType; import org.openhab.core.types.State; import org.openhab.core.types.StateDescription; +import org.openhab.core.types.TimeSeries; import org.openhab.core.types.UnDefType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,6 +52,7 @@ * @author Kai Kreuzer - Initial contribution * @author Andre Fuechsel - Added tags * @author Stefan Bußweiler - Migration to new ESH event concept + * @author Jan N. Klug - Added time series support */ @NonNullByDefault public abstract class GenericItem implements ActiveItem { @@ -64,6 +66,9 @@ public abstract class GenericItem implements ActiveItem { protected Set listeners = new CopyOnWriteArraySet<>( Collections.newSetFromMap(new WeakHashMap<>())); + protected Set timeSeriesListeners = new CopyOnWriteArraySet<>( + Collections.newSetFromMap(new WeakHashMap<>())); + protected List groupNames = new ArrayList<>(); protected Set tags = new HashSet<>(); @@ -229,6 +234,50 @@ protected final void applyState(State state) { } } + /** + * Set a new time series. + *

+ * Subclasses may override this method in order to do necessary conversions upfront. Afterwards, + * {@link #applyTimeSeries(TimeSeries)} should be called by classes overriding this method. + *

+ * A time series may only contain events that are compatible with the item's internal state. + * + * @param timeSeries new time series of this item + */ + public void setTimeSeries(TimeSeries timeSeries) { + applyTimeSeries(timeSeries); + } + + /** + * Sets new time series, notifies listeners and sends events. + *

+ * Classes overriding the {@link #setTimeSeries(TimeSeries)} method should call this method in order to actually set + * the time series, inform listeners and send the event. + *

+ * A time series may only contain events that are compatible with the item's internal state. + * + * @param timeSeries new time series of this item + */ + protected final void applyTimeSeries(TimeSeries timeSeries) { + // notify listeners + Set clonedListeners = new CopyOnWriteArraySet<>(timeSeriesListeners); + ExecutorService pool = ThreadPoolManager.getPool(ITEM_THREADPOOLNAME); + clonedListeners.forEach(listener -> pool.execute(() -> { + try { + listener.timeSeriesUpdated(GenericItem.this, timeSeries); + } catch (Exception e) { + logger.warn("failed notifying listener '{}' about timeseries update of item {}: {}", listener, + GenericItem.this.getName(), e.getMessage(), e); + } + })); + + // send event + EventPublisher eventPublisher1 = this.eventPublisher; + if (eventPublisher1 != null) { + eventPublisher1.post(ItemEventFactory.createTimeSeriesUpdatedEvent(this.name, timeSeries, null)); + } + } + private void sendStateUpdatedEvent(State newState) { EventPublisher eventPublisher1 = this.eventPublisher; if (eventPublisher1 != null) { @@ -314,6 +363,18 @@ public void removeStateChangeListener(StateChangeListener listener) { } } + public void addTimeSeriesListener(TimeSeriesListener listener) { + synchronized (timeSeriesListeners) { + timeSeriesListeners.add(listener); + } + } + + public void removeTimeSeriesListener(TimeSeriesListener listener) { + synchronized (timeSeriesListeners) { + timeSeriesListeners.remove(listener); + } + } + @Override public int hashCode() { final int prime = 31; @@ -437,8 +498,7 @@ public void setCategory(@Nullable String category) { * @return true if state is an acceptedDataType or subclass thereof */ public boolean isAcceptedState(List> acceptedDataTypes, State state) { - return acceptedDataTypes.stream().map(clazz -> clazz.isAssignableFrom(state.getClass())).filter(found -> found) - .findAny().isPresent(); + return acceptedDataTypes.stream().anyMatch(clazz -> clazz.isAssignableFrom(state.getClass())); } protected void logSetTypeError(State state) { @@ -446,7 +506,12 @@ protected void logSetTypeError(State state) { state.getClass().getSimpleName(), getName(), getClass().getSimpleName()); } - private @Nullable CommandDescription stateOptions2CommandOptions(StateDescription stateDescription) { + protected void logSetTypeError(TimeSeries timeSeries) { + logger.error("Tried to set invalid state in time series {} on item {} of type {}, ignoring it", timeSeries, + getName(), getClass().getSimpleName()); + } + + private CommandDescription stateOptions2CommandOptions(StateDescription stateDescription) { CommandDescriptionBuilder builder = CommandDescriptionBuilder.create(); stateDescription.getOptions() .forEach(so -> builder.withCommandOption(new CommandOption(so.getValue(), so.getLabel()))); diff --git a/bundles/org.openhab.core/src/main/java/org/openhab/core/items/TimeSeriesListener.java b/bundles/org.openhab.core/src/main/java/org/openhab/core/items/TimeSeriesListener.java new file mode 100644 index 00000000000..dbc44f8668e --- /dev/null +++ b/bundles/org.openhab.core/src/main/java/org/openhab/core/items/TimeSeriesListener.java @@ -0,0 +1,38 @@ +/** + * Copyright (c) 2010-2023 Contributors to the openHAB project + * + * See the NOTICE file(s) distributed with this work for additional + * information. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.openhab.core.items; + +import org.eclipse.jdt.annotation.NonNullByDefault; +import org.openhab.core.types.TimeSeries; + +/** + *

+ * This interface must be implemented by all classes that want to be notified about |@link TimeSeries} updates of an + * item. + * + *

+ * The {@link GenericItem} class provides the possibility to register such listeners. + * + * @author Jan N. Klug - Initial contribution + */ +@NonNullByDefault +public interface TimeSeriesListener { + + /** + * This method is called, if a time series update was sent to the item. + * + * @param item the item the timeseries was updated for + * @param timeSeries the time series + */ + void timeSeriesUpdated(Item item, TimeSeries timeSeries); +} diff --git a/bundles/org.openhab.core/src/main/java/org/openhab/core/items/events/AbstractItemEventSubscriber.java b/bundles/org.openhab.core/src/main/java/org/openhab/core/items/events/AbstractItemEventSubscriber.java index e9f323fc991..f3124152fd6 100644 --- a/bundles/org.openhab.core/src/main/java/org/openhab/core/items/events/AbstractItemEventSubscriber.java +++ b/bundles/org.openhab.core/src/main/java/org/openhab/core/items/events/AbstractItemEventSubscriber.java @@ -31,7 +31,8 @@ @NonNullByDefault public abstract class AbstractItemEventSubscriber implements EventSubscriber { - private final Set subscribedEventTypes = Set.of(ItemStateEvent.TYPE, ItemCommandEvent.TYPE); + private final Set subscribedEventTypes = Set.of(ItemStateEvent.TYPE, ItemCommandEvent.TYPE, + ItemTimeSeriesEvent.TYPE); @Override public Set getSubscribedEventTypes() { @@ -44,6 +45,8 @@ public void receive(Event event) { receiveUpdate(stateEvent); } else if (event instanceof ItemCommandEvent commandEvent) { receiveCommand(commandEvent); + } else if (event instanceof ItemTimeSeriesEvent timeSeriesEvent) { + receiveTimeSeries(timeSeriesEvent); } } @@ -66,4 +69,14 @@ protected void receiveUpdate(ItemStateEvent updateEvent) { // Default implementation: do nothing. // Can be implemented by subclass in order to handle item updates. } + + /** + * Callback method for receiving item timeseries events from the openHAB event bus. + * + * @param timeSeriesEvent the timeseries event + */ + protected void receiveTimeSeries(ItemTimeSeriesEvent timeSeriesEvent) { + // Default implementation: do nothing. + // Can be implemented by subclass in order to handle timeseries updates. + } } diff --git a/bundles/org.openhab.core/src/main/java/org/openhab/core/items/events/ItemEventFactory.java b/bundles/org.openhab.core/src/main/java/org/openhab/core/items/events/ItemEventFactory.java index 5f4bdbc460d..6225c4c7490 100644 --- a/bundles/org.openhab.core/src/main/java/org/openhab/core/items/events/ItemEventFactory.java +++ b/bundles/org.openhab.core/src/main/java/org/openhab/core/items/events/ItemEventFactory.java @@ -14,6 +14,7 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.time.Instant; import java.util.LinkedList; import java.util.List; import java.util.Set; @@ -29,6 +30,7 @@ import org.openhab.core.types.Command; import org.openhab.core.types.RefreshType; import org.openhab.core.types.State; +import org.openhab.core.types.TimeSeries; import org.openhab.core.types.Type; import org.openhab.core.types.UnDefType; import org.osgi.service.component.annotations.Component; @@ -51,6 +53,8 @@ public class ItemEventFactory extends AbstractEventFactory { private static final String ITEM_STATE_EVENT_TOPIC = "openhab/items/{itemName}/state"; private static final String ITEM_STATE_UPDATED_EVENT_TOPIC = "openhab/items/{itemName}/stateupdated"; + private static final String ITEM_TIME_SERIES_EVENT_TOPIC = "openhab/items/{itemName}/timeseries"; + private static final String ITEM_TIME_SERIES_UPDATED_EVENT_TOPIC = "openhab/items/{itemName}/timeseriesupdated"; private static final String ITEM_STATE_PREDICTED_EVENT_TOPIC = "openhab/items/{itemName}/statepredicted"; @@ -72,7 +76,8 @@ public class ItemEventFactory extends AbstractEventFactory { public ItemEventFactory() { super(Set.of(ItemCommandEvent.TYPE, ItemStateEvent.TYPE, ItemStatePredictedEvent.TYPE, ItemStateUpdatedEvent.TYPE, ItemStateChangedEvent.TYPE, ItemAddedEvent.TYPE, ItemUpdatedEvent.TYPE, - ItemRemovedEvent.TYPE, GroupStateUpdatedEvent.TYPE, GroupItemStateChangedEvent.TYPE)); + ItemRemovedEvent.TYPE, GroupStateUpdatedEvent.TYPE, GroupItemStateChangedEvent.TYPE, + ItemTimeSeriesEvent.TYPE, ItemTimeSeriesUpdatedEvent.TYPE)); } @Override @@ -88,6 +93,10 @@ protected Event createEventByType(String eventType, String topic, String payload return createStateUpdatedEvent(topic, payload); } else if (ItemStateChangedEvent.TYPE.equals(eventType)) { return createStateChangedEvent(topic, payload); + } else if (ItemTimeSeriesEvent.TYPE.equals(eventType)) { + return createTimeSeriesEvent(topic, payload); + } else if (ItemTimeSeriesUpdatedEvent.TYPE.equals(eventType)) { + return createTimeSeriesUpdatedEvent(topic, payload); } else if (ItemAddedEvent.TYPE.equals(eventType)) { return createAddedEvent(topic, payload); } else if (ItemUpdatedEvent.TYPE.equals(eventType)) { @@ -155,6 +164,20 @@ private Event createStateChangedEvent(String topic, String payload) { return new ItemStateChangedEvent(topic, payload, itemName, state, oldState); } + private Event createTimeSeriesEvent(String topic, String payload) { + String itemName = getItemName(topic); + ItemTimeSeriesEventPayloadBean bean = deserializePayload(payload, ItemTimeSeriesEventPayloadBean.class); + TimeSeries timeSeries = bean.getTimeSeries(); + return new ItemTimeSeriesEvent(topic, payload, itemName, timeSeries, null); + } + + private Event createTimeSeriesUpdatedEvent(String topic, String payload) { + String itemName = getItemName(topic); + ItemTimeSeriesEventPayloadBean bean = deserializePayload(payload, ItemTimeSeriesEventPayloadBean.class); + TimeSeries timeSeries = bean.getTimeSeries(); + return new ItemTimeSeriesUpdatedEvent(topic, payload, itemName, timeSeries, null); + } + private State getState(String type, String value) { return parseType(type, value, State.class); } @@ -175,7 +198,7 @@ private String getMemberName(String topic) { return topicElements[3]; } - private T parseType(String typeName, String valueToParse, Class desiredClass) { + private static T parseType(String typeName, String valueToParse, Class desiredClass) { Object parsedObject = null; String simpleClassName = typeName + TYPE_POSTFIX; parsedObject = parseSimpleClassName(simpleClassName, valueToParse); @@ -190,7 +213,7 @@ private T parseType(String typeName, String valueToParse, Class desiredCl return desiredClass.cast(parsedObject); } - private @Nullable Object parseSimpleClassName(String simpleClassName, String valueToParse) { + private static @Nullable Object parseSimpleClassName(String simpleClassName, String valueToParse) { if (simpleClassName.equals(UnDefType.class.getSimpleName())) { return UnDefType.valueOf(valueToParse); } @@ -320,6 +343,22 @@ public static ItemStateUpdatedEvent createStateUpdatedEvent(String itemName, Sta return new ItemStateUpdatedEvent(topic, payload, itemName, state, source); } + public static ItemTimeSeriesEvent createTimeSeriesEvent(String itemName, TimeSeries timeSeries, + @Nullable String source) { + String topic = buildTopic(ITEM_TIME_SERIES_EVENT_TOPIC, itemName); + ItemTimeSeriesEventPayloadBean bean = new ItemTimeSeriesEventPayloadBean(timeSeries); + String payload = serializePayload(bean); + return new ItemTimeSeriesEvent(topic, payload, itemName, timeSeries, source); + } + + public static ItemTimeSeriesUpdatedEvent createTimeSeriesUpdatedEvent(String itemName, TimeSeries timeSeries, + @Nullable String source) { + String topic = buildTopic(ITEM_TIME_SERIES_UPDATED_EVENT_TOPIC, itemName); + ItemTimeSeriesEventPayloadBean bean = new ItemTimeSeriesEventPayloadBean(timeSeries); + String payload = serializePayload(bean); + return new ItemTimeSeriesUpdatedEvent(topic, payload, itemName, timeSeries, source); + } + /** * Creates a group item state updated event. * @@ -585,4 +624,58 @@ public String getOldValue() { return oldValue; } } + + private static class ItemTimeSeriesEventPayloadBean { + private @NonNullByDefault({}) List timeSeries; + private @NonNullByDefault({}) String policy; + + @SuppressWarnings("unused") + private ItemTimeSeriesEventPayloadBean() { + // do not remove, GSON needs it + } + + public ItemTimeSeriesEventPayloadBean(TimeSeries timeSeries) { + this.timeSeries = timeSeries.getStates().map(TimeSeriesPayload::new).toList(); + this.policy = timeSeries.getPolicy().name(); + } + + public TimeSeries getTimeSeries() { + TimeSeries timeSeries1 = new TimeSeries(TimeSeries.Policy.valueOf(policy)); + timeSeries.forEach(e -> { + State state = parseType(e.getType(), e.getValue(), State.class); + Instant instant = Instant.parse(e.getTimestamp()); + timeSeries1.add(instant, state); + }); + return timeSeries1; + } + + private static class TimeSeriesPayload { + private @NonNullByDefault({}) String type; + private @NonNullByDefault({}) String value; + private @NonNullByDefault({}) String timestamp; + + @SuppressWarnings("unused") + private TimeSeriesPayload() { + // do not remove, GSON needs it + } + + public TimeSeriesPayload(TimeSeries.Entry entry) { + type = getStateType(entry.state()); + value = entry.state().toFullString(); + timestamp = entry.timestamp().toString(); + } + + public String getType() { + return type; + } + + public String getValue() { + return value; + } + + public String getTimestamp() { + return timestamp; + } + } + } } diff --git a/bundles/org.openhab.core/src/main/java/org/openhab/core/items/events/ItemTimeSeriesEvent.java b/bundles/org.openhab.core/src/main/java/org/openhab/core/items/events/ItemTimeSeriesEvent.java new file mode 100644 index 00000000000..2b8e0162efe --- /dev/null +++ b/bundles/org.openhab.core/src/main/java/org/openhab/core/items/events/ItemTimeSeriesEvent.java @@ -0,0 +1,65 @@ +/** + * Copyright (c) 2010-2023 Contributors to the openHAB project + * + * See the NOTICE file(s) distributed with this work for additional + * information. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.openhab.core.items.events; + +import org.eclipse.jdt.annotation.NonNullByDefault; +import org.eclipse.jdt.annotation.Nullable; +import org.openhab.core.types.TimeSeries; + +/** + * The {@link ItemTimeSeriesEvent} can be used to report item time series updates through the openHAB event bus. + * Time series events must be created with the {@link ItemEventFactory}. + * + * @author Jan N. Klug - Initial contribution + */ +@NonNullByDefault +public class ItemTimeSeriesEvent extends ItemEvent { + + public static final String TYPE = ItemTimeSeriesEvent.class.getSimpleName(); + + protected final TimeSeries timeSeries; + + /** + * Constructs a new item time series event. + * + * @param topic the topic + * @param payload the payload + * @param itemName the item name + * @param timeSeries the time series + * @param source the source, can be null + */ + protected ItemTimeSeriesEvent(String topic, String payload, String itemName, TimeSeries timeSeries, + @Nullable String source) { + super(topic, payload, itemName, source); + this.timeSeries = timeSeries; + } + + @Override + public String getType() { + return TYPE; + } + + /** + * Gets the item time series. + * + * @return the item time series + */ + public TimeSeries getTimeSeries() { + return timeSeries; + } + + @Override + public String toString() { + return String.format("Item '%s' shall process timeseries %s", itemName, timeSeries.getStates().toList()); + } +} diff --git a/bundles/org.openhab.core/src/main/java/org/openhab/core/items/events/ItemTimeSeriesUpdatedEvent.java b/bundles/org.openhab.core/src/main/java/org/openhab/core/items/events/ItemTimeSeriesUpdatedEvent.java new file mode 100644 index 00000000000..de3419d267a --- /dev/null +++ b/bundles/org.openhab.core/src/main/java/org/openhab/core/items/events/ItemTimeSeriesUpdatedEvent.java @@ -0,0 +1,65 @@ +/** + * Copyright (c) 2010-2023 Contributors to the openHAB project + * + * See the NOTICE file(s) distributed with this work for additional + * information. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.openhab.core.items.events; + +import org.eclipse.jdt.annotation.NonNullByDefault; +import org.eclipse.jdt.annotation.Nullable; +import org.openhab.core.types.TimeSeries; + +/** + * The {@link ItemTimeSeriesUpdatedEvent} can be used to report item time series updates through the openHAB event bus. + * Time series events must be created with the {@link ItemEventFactory}. + * + * @author Jan N. Klug - Initial contribution + */ +@NonNullByDefault +public class ItemTimeSeriesUpdatedEvent extends ItemEvent { + + public static final String TYPE = ItemTimeSeriesUpdatedEvent.class.getSimpleName(); + + protected final TimeSeries timeSeries; + + /** + * Constructs a new item time series updated event. + * + * @param topic the topic + * @param payload the payload + * @param itemName the item name + * @param timeSeries the time series + * @param source the source, can be null + */ + protected ItemTimeSeriesUpdatedEvent(String topic, String payload, String itemName, TimeSeries timeSeries, + @Nullable String source) { + super(topic, payload, itemName, source); + this.timeSeries = timeSeries; + } + + @Override + public String getType() { + return TYPE; + } + + /** + * Gets the item time series. + * + * @return the item time series + */ + public TimeSeries getTimeSeries() { + return timeSeries; + } + + @Override + public String toString() { + return String.format("Item '%s' updated timeseries %s", itemName, timeSeries.getStates().toList()); + } +} diff --git a/bundles/org.openhab.core/src/main/java/org/openhab/core/library/items/CallItem.java b/bundles/org.openhab.core/src/main/java/org/openhab/core/library/items/CallItem.java index 73c87b9a9f6..e6197ef749d 100644 --- a/bundles/org.openhab.core/src/main/java/org/openhab/core/library/items/CallItem.java +++ b/bundles/org.openhab.core/src/main/java/org/openhab/core/library/items/CallItem.java @@ -21,6 +21,7 @@ import org.openhab.core.types.Command; import org.openhab.core.types.RefreshType; import org.openhab.core.types.State; +import org.openhab.core.types.TimeSeries; import org.openhab.core.types.UnDefType; /** @@ -53,9 +54,18 @@ public List> getAcceptedCommandTypes() { @Override public void setState(State state) { if (isAcceptedState(ACCEPTED_DATA_TYPES, state)) { - super.setState(state); + applyState(state); } else { logSetTypeError(state); } } + + @Override + public void setTimeSeries(TimeSeries timeSeries) { + if (timeSeries.getStates().allMatch(s -> isAcceptedState(ACCEPTED_DATA_TYPES, s.state()))) { + applyTimeSeries(timeSeries); + } else { + logSetTypeError(timeSeries); + } + } } diff --git a/bundles/org.openhab.core/src/main/java/org/openhab/core/library/items/ColorItem.java b/bundles/org.openhab.core/src/main/java/org/openhab/core/library/items/ColorItem.java index 45afe755309..fa2beba2039 100644 --- a/bundles/org.openhab.core/src/main/java/org/openhab/core/library/items/ColorItem.java +++ b/bundles/org.openhab.core/src/main/java/org/openhab/core/library/items/ColorItem.java @@ -25,6 +25,7 @@ import org.openhab.core.types.Command; import org.openhab.core.types.RefreshType; import org.openhab.core.types.State; +import org.openhab.core.types.TimeSeries; import org.openhab.core.types.UnDefType; /** @@ -92,4 +93,13 @@ public void setState(State state) { logSetTypeError(state); } } + + @Override + public void setTimeSeries(TimeSeries timeSeries) { + if (timeSeries.getStates().allMatch(s -> s.state() instanceof HSBType)) { + applyTimeSeries(timeSeries); + } else { + logSetTypeError(timeSeries); + } + } } diff --git a/bundles/org.openhab.core/src/main/java/org/openhab/core/library/items/ContactItem.java b/bundles/org.openhab.core/src/main/java/org/openhab/core/library/items/ContactItem.java index e64cc907cdd..843043d8b29 100644 --- a/bundles/org.openhab.core/src/main/java/org/openhab/core/library/items/ContactItem.java +++ b/bundles/org.openhab.core/src/main/java/org/openhab/core/library/items/ContactItem.java @@ -21,6 +21,7 @@ import org.openhab.core.types.Command; import org.openhab.core.types.RefreshType; import org.openhab.core.types.State; +import org.openhab.core.types.TimeSeries; import org.openhab.core.types.UnDefType; /** @@ -53,9 +54,18 @@ public List> getAcceptedCommandTypes() { @Override public void setState(State state) { if (isAcceptedState(ACCEPTED_DATA_TYPES, state)) { - super.setState(state); + applyState(state); } else { logSetTypeError(state); } } + + @Override + public void setTimeSeries(TimeSeries timeSeries) { + if (timeSeries.getStates().allMatch(s -> s.state() instanceof OpenClosedType)) { + applyTimeSeries(timeSeries); + } else { + logSetTypeError(timeSeries); + } + } } diff --git a/bundles/org.openhab.core/src/main/java/org/openhab/core/library/items/DateTimeItem.java b/bundles/org.openhab.core/src/main/java/org/openhab/core/library/items/DateTimeItem.java index 3c4ccfa3ddc..1556d32cac2 100644 --- a/bundles/org.openhab.core/src/main/java/org/openhab/core/library/items/DateTimeItem.java +++ b/bundles/org.openhab.core/src/main/java/org/openhab/core/library/items/DateTimeItem.java @@ -21,6 +21,7 @@ import org.openhab.core.types.Command; import org.openhab.core.types.RefreshType; import org.openhab.core.types.State; +import org.openhab.core.types.TimeSeries; import org.openhab.core.types.UnDefType; /** @@ -69,4 +70,13 @@ public void setState(State state) { logSetTypeError(state); } } + + @Override + public void setTimeSeries(TimeSeries timeSeries) { + if (timeSeries.getStates().allMatch(s -> s.state() instanceof DateTimeType)) { + applyTimeSeries(timeSeries); + } else { + logSetTypeError(timeSeries); + } + } } diff --git a/bundles/org.openhab.core/src/main/java/org/openhab/core/library/items/DimmerItem.java b/bundles/org.openhab.core/src/main/java/org/openhab/core/library/items/DimmerItem.java index a3cf6f0ec5e..898fbca1bb6 100644 --- a/bundles/org.openhab.core/src/main/java/org/openhab/core/library/items/DimmerItem.java +++ b/bundles/org.openhab.core/src/main/java/org/openhab/core/library/items/DimmerItem.java @@ -22,6 +22,7 @@ import org.openhab.core.types.Command; import org.openhab.core.types.RefreshType; import org.openhab.core.types.State; +import org.openhab.core.types.TimeSeries; import org.openhab.core.types.UnDefType; /** @@ -79,4 +80,13 @@ public void setState(State state) { logSetTypeError(state); } } + + @Override + public void setTimeSeries(TimeSeries timeSeries) { + if (timeSeries.getStates().allMatch(s -> s.state() instanceof PercentType)) { + super.applyTimeSeries(timeSeries); + } else { + logSetTypeError(timeSeries); + } + } } diff --git a/bundles/org.openhab.core/src/main/java/org/openhab/core/library/items/ImageItem.java b/bundles/org.openhab.core/src/main/java/org/openhab/core/library/items/ImageItem.java index b2a830c2a61..c0567f2e85b 100644 --- a/bundles/org.openhab.core/src/main/java/org/openhab/core/library/items/ImageItem.java +++ b/bundles/org.openhab.core/src/main/java/org/openhab/core/library/items/ImageItem.java @@ -21,6 +21,7 @@ import org.openhab.core.types.Command; import org.openhab.core.types.RefreshType; import org.openhab.core.types.State; +import org.openhab.core.types.TimeSeries; import org.openhab.core.types.UnDefType; /** @@ -51,9 +52,18 @@ public List> getAcceptedCommandTypes() { @Override public void setState(State state) { if (isAcceptedState(ACCEPTED_DATA_TYPES, state)) { - super.setState(state); + applyState(state); } else { logSetTypeError(state); } } + + @Override + public void setTimeSeries(TimeSeries timeSeries) { + if (timeSeries.getStates().allMatch(s -> s.state() instanceof RawType)) { + applyTimeSeries(timeSeries); + } else { + logSetTypeError(timeSeries); + } + } } diff --git a/bundles/org.openhab.core/src/main/java/org/openhab/core/library/items/LocationItem.java b/bundles/org.openhab.core/src/main/java/org/openhab/core/library/items/LocationItem.java index 3d8f691e7fe..b01601597d7 100644 --- a/bundles/org.openhab.core/src/main/java/org/openhab/core/library/items/LocationItem.java +++ b/bundles/org.openhab.core/src/main/java/org/openhab/core/library/items/LocationItem.java @@ -23,6 +23,7 @@ import org.openhab.core.types.Command; import org.openhab.core.types.RefreshType; import org.openhab.core.types.State; +import org.openhab.core.types.TimeSeries; import org.openhab.core.types.UnDefType; /** @@ -75,9 +76,18 @@ public DecimalType distanceFrom(@Nullable LocationItem awayItem) { @Override public void setState(State state) { if (isAcceptedState(ACCEPTED_DATA_TYPES, state)) { - super.setState(state); + applyState(state); } else { logSetTypeError(state); } } + + @Override + public void setTimeSeries(TimeSeries timeSeries) { + if (timeSeries.getStates().allMatch(s -> s.state() instanceof PointType)) { + applyTimeSeries(timeSeries); + } else { + logSetTypeError(timeSeries); + } + } } diff --git a/bundles/org.openhab.core/src/main/java/org/openhab/core/library/items/NumberItem.java b/bundles/org.openhab.core/src/main/java/org/openhab/core/library/items/NumberItem.java index e7218508092..15be66124a2 100644 --- a/bundles/org.openhab.core/src/main/java/org/openhab/core/library/items/NumberItem.java +++ b/bundles/org.openhab.core/src/main/java/org/openhab/core/library/items/NumberItem.java @@ -14,6 +14,7 @@ import java.util.List; import java.util.Locale; +import java.util.Objects; import javax.measure.Dimension; import javax.measure.Quantity; @@ -35,6 +36,7 @@ import org.openhab.core.types.State; import org.openhab.core.types.StateDescription; import org.openhab.core.types.StateDescriptionFragmentBuilder; +import org.openhab.core.types.TimeSeries; import org.openhab.core.types.UnDefType; import org.openhab.core.types.util.UnitUtils; import org.slf4j.Logger; @@ -136,13 +138,12 @@ public void send(QuantityType command) { return dimension; } - @Override - public void setState(State state) { + private @Nullable State getInternalState(State state) { if (state instanceof QuantityType quantityType) { if (dimension == null) { // QuantityType update to a NumberItem without unit, strip unit DecimalType plainState = new DecimalType(quantityType.toBigDecimal()); - super.applyState(plainState); + return plainState; } else { // QuantityType update to a NumberItem with unit, convert to item unit (if possible) Unit stateUnit = quantityType.getUnit(); @@ -150,7 +151,7 @@ public void setState(State state) { ? quantityType.toInvertibleUnit(unit) : null; if (convertedState != null) { - super.applyState(convertedState); + return convertedState; } else { logger.warn("Failed to update item '{}' because '{}' could not be converted to the item unit '{}'", name, state, unit); @@ -159,18 +160,44 @@ public void setState(State state) { } else if (state instanceof DecimalType decimalType) { if (dimension == null) { // DecimalType update to NumberItem with unit - super.applyState(decimalType); + return decimalType; } else { // DecimalType update for a NumberItem with dimension, convert to QuantityType - super.applyState(new QuantityType<>(decimalType.doubleValue(), unit)); + return new QuantityType<>(decimalType.doubleValue(), unit); + } + } + return null; + } + + @Override + public void setState(State state) { + if (state instanceof DecimalType || state instanceof QuantityType) { + State internalState = getInternalState(state); + if (internalState != null) { + applyState(internalState); } } else if (state instanceof UnDefType) { - super.applyState(state); + applyState(state); } else { logSetTypeError(state); } } + @Override + public void setTimeSeries(TimeSeries timeSeries) { + TimeSeries internalSeries = new TimeSeries(timeSeries.getPolicy()); + timeSeries.getStates().forEach(s -> internalSeries.add(s.timestamp(), + Objects.requireNonNullElse(getInternalState(s.state()), UnDefType.NULL))); + + if (dimension != null && internalSeries.getStates().allMatch(s -> s.state() instanceof QuantityType)) { + applyTimeSeries(internalSeries); + } else if (internalSeries.getStates().allMatch(s -> s.state() instanceof DecimalType)) { + applyTimeSeries(internalSeries); + } else { + logSetTypeError(timeSeries); + } + } + /** * Returns the optional unit symbol for this {@link NumberItem}. * diff --git a/bundles/org.openhab.core/src/main/java/org/openhab/core/library/items/PlayerItem.java b/bundles/org.openhab.core/src/main/java/org/openhab/core/library/items/PlayerItem.java index a1783e779dc..e2d6c4487e2 100644 --- a/bundles/org.openhab.core/src/main/java/org/openhab/core/library/items/PlayerItem.java +++ b/bundles/org.openhab.core/src/main/java/org/openhab/core/library/items/PlayerItem.java @@ -23,6 +23,7 @@ import org.openhab.core.types.Command; import org.openhab.core.types.RefreshType; import org.openhab.core.types.State; +import org.openhab.core.types.TimeSeries; import org.openhab.core.types.UnDefType; /** @@ -71,9 +72,19 @@ public void send(NextPreviousType command) { @Override public void setState(State state) { if (isAcceptedState(ACCEPTED_DATA_TYPES, state)) { - super.setState(state); + applyState(state); } else { logSetTypeError(state); } } + + @Override + public void setTimeSeries(TimeSeries timeSeries) { + if (timeSeries.getStates() + .allMatch(s -> s.state() instanceof PlayPauseType || s.state() instanceof RewindFastforwardType)) { + applyTimeSeries(timeSeries); + } else { + logSetTypeError(timeSeries); + } + } } diff --git a/bundles/org.openhab.core/src/main/java/org/openhab/core/library/items/RollershutterItem.java b/bundles/org.openhab.core/src/main/java/org/openhab/core/library/items/RollershutterItem.java index c8e91477df6..c838b5c5348 100644 --- a/bundles/org.openhab.core/src/main/java/org/openhab/core/library/items/RollershutterItem.java +++ b/bundles/org.openhab.core/src/main/java/org/openhab/core/library/items/RollershutterItem.java @@ -23,6 +23,7 @@ import org.openhab.core.types.Command; import org.openhab.core.types.RefreshType; import org.openhab.core.types.State; +import org.openhab.core.types.TimeSeries; import org.openhab.core.types.UnDefType; /** @@ -80,4 +81,13 @@ public void setState(State state) { logSetTypeError(state); } } + + @Override + public void setTimeSeries(TimeSeries timeSeries) { + if (timeSeries.getStates().allMatch(s -> isAcceptedState(ACCEPTED_DATA_TYPES, s.state()))) { + applyTimeSeries(timeSeries); + } else { + logSetTypeError(timeSeries); + } + } } diff --git a/bundles/org.openhab.core/src/main/java/org/openhab/core/library/items/StringItem.java b/bundles/org.openhab.core/src/main/java/org/openhab/core/library/items/StringItem.java index 6c5a960afec..da2650b45b9 100644 --- a/bundles/org.openhab.core/src/main/java/org/openhab/core/library/items/StringItem.java +++ b/bundles/org.openhab.core/src/main/java/org/openhab/core/library/items/StringItem.java @@ -24,6 +24,7 @@ import org.openhab.core.types.Command; import org.openhab.core.types.RefreshType; import org.openhab.core.types.State; +import org.openhab.core.types.TimeSeries; import org.openhab.core.types.TypeParser; import org.openhab.core.types.UnDefType; @@ -76,9 +77,18 @@ public List> getAcceptedCommandTypes() { @Override public void setState(State state) { if (isAcceptedState(ACCEPTED_DATA_TYPES, state)) { - super.setState(state); + applyState(state); } else { logSetTypeError(state); } } + + @Override + public void setTimeSeries(TimeSeries timeSeries) { + if (timeSeries.getStates().allMatch(s -> s.state() instanceof StringType)) { + applyTimeSeries(timeSeries); + } else { + logSetTypeError(timeSeries); + } + } } diff --git a/bundles/org.openhab.core/src/main/java/org/openhab/core/library/items/SwitchItem.java b/bundles/org.openhab.core/src/main/java/org/openhab/core/library/items/SwitchItem.java index d24c1d05fd0..37ae2258a99 100644 --- a/bundles/org.openhab.core/src/main/java/org/openhab/core/library/items/SwitchItem.java +++ b/bundles/org.openhab.core/src/main/java/org/openhab/core/library/items/SwitchItem.java @@ -21,6 +21,7 @@ import org.openhab.core.types.Command; import org.openhab.core.types.RefreshType; import org.openhab.core.types.State; +import org.openhab.core.types.TimeSeries; import org.openhab.core.types.UnDefType; /** @@ -61,9 +62,18 @@ public List> getAcceptedCommandTypes() { @Override public void setState(State state) { if (isAcceptedState(ACCEPTED_DATA_TYPES, state)) { - super.setState(state); + applyState(state); } else { logSetTypeError(state); } } + + @Override + public void setTimeSeries(TimeSeries timeSeries) { + if (timeSeries.getStates().allMatch(s -> s.state() instanceof OnOffType)) { + applyTimeSeries(timeSeries); + } else { + logSetTypeError(timeSeries); + } + } } diff --git a/bundles/org.openhab.core/src/main/java/org/openhab/core/types/TimeSeries.java b/bundles/org.openhab.core/src/main/java/org/openhab/core/types/TimeSeries.java new file mode 100644 index 00000000000..709f5ff861f --- /dev/null +++ b/bundles/org.openhab.core/src/main/java/org/openhab/core/types/TimeSeries.java @@ -0,0 +1,124 @@ +/** + * Copyright (c) 2010-2023 Contributors to the openHAB project + * + * See the NOTICE file(s) distributed with this work for additional + * information. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.openhab.core.types; + +import java.time.Instant; +import java.util.Comparator; +import java.util.List; +import java.util.Objects; +import java.util.TreeSet; +import java.util.stream.Stream; + +import org.eclipse.jdt.annotation.NonNullByDefault; +import org.eclipse.jdt.annotation.Nullable; + +/** + * The {@link TimeSeries} is used to transport a set of states together with their timestamp. + * It can be used for persisting historic state or forecasts. + * + * @author Jan N. Klug - Initial contribution + */ +@NonNullByDefault +public class TimeSeries { + private final TreeSet states = new TreeSet<>(Comparator.comparing(e -> e.timestamp)); + private final Policy policy; + + public TimeSeries(Policy policy) { + this.policy = policy; + } + + /** + * Get the persistence policy of this series. + *

+ * {@link Policy#ADD} add the content to the persistence, {@link Policy#REPLACE} first removes all persisted + * elements in the timespan given by {@link #getBegin()} and {@link #getEnd()}. + * + * @return + */ + public Policy getPolicy() { + return policy; + } + + /** + * Get the timestamp of the first element in this series. + * + * @return the {@link Instant} of the first element + */ + public Instant getBegin() { + return states.isEmpty() ? Instant.MAX : states.first().timestamp(); + } + + /** + * Get the timestamp of the last element in this series. + * + * @return the {@link Instant} of the last element + */ + public Instant getEnd() { + return states.isEmpty() ? Instant.MIN : states.last().timestamp(); + } + + /** + * Get the number of elements in this series. + * + * @return the number of elements + */ + public int size() { + return states.size(); + } + + /** + * Add a new element to this series. + *

+ * Elements can be added in an arbitrary order and are sorted chronologically. + * + * @param timestamp an {@link Instant} for the given state + * @param state the {@link State} at the given timestamp + */ + public void add(Instant timestamp, State state) { + states.add(new Entry(timestamp, state)); + } + + /** + * Get the content of this series. + *

+ * The entries are returned in chronological order, earlier entries before later entries. + * + * @return a {@link } with the content of this series. + */ + public Stream getStates() { + return List.copyOf(states).stream(); + } + + public record Entry(Instant timestamp, State state) { + } + + @Override + public boolean equals(@Nullable Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + TimeSeries that = (TimeSeries) o; + return Objects.equals(states, that.states) && policy == that.policy; + } + + @Override + public int hashCode() { + return Objects.hash(states, policy); + } + + public enum Policy { + ADD, + REPLACE + } +} diff --git a/bundles/org.openhab.core/src/test/java/org/openhab/core/types/TimeSeriesTest.java b/bundles/org.openhab.core/src/test/java/org/openhab/core/types/TimeSeriesTest.java new file mode 100644 index 00000000000..d4aaef3176b --- /dev/null +++ b/bundles/org.openhab.core/src/test/java/org/openhab/core/types/TimeSeriesTest.java @@ -0,0 +1,66 @@ +/** + * Copyright (c) 2010-2023 Contributors to the openHAB project + * + * See the NOTICE file(s) distributed with this work for additional + * information. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.openhab.core.types; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; + +import java.time.Instant; +import java.util.List; + +import org.eclipse.jdt.annotation.NonNullByDefault; +import org.junit.jupiter.api.Test; +import org.openhab.core.library.types.DecimalType; + +/** + * The {@link TimeSeriesTest} contains tests for {@link TimeSeries} + * + * @author Jan N. Klug - Initial contribution + */ +@NonNullByDefault +public class TimeSeriesTest { + + @Test + public void testAdditionOrderDoesNotMatter() { + Instant time1 = Instant.now(); + Instant time2 = time1.plusSeconds(1000); + Instant time3 = time1.minusSeconds(1000); + Instant time4 = time1.plusSeconds(50); + + TimeSeries timeSeries = new TimeSeries(TimeSeries.Policy.ADD); + assertThat(timeSeries.getPolicy(), is(TimeSeries.Policy.ADD)); + + timeSeries.add(time1, new DecimalType(time1.toEpochMilli())); + timeSeries.add(time2, new DecimalType(time2.toEpochMilli())); + timeSeries.add(time3, new DecimalType(time3.toEpochMilli())); + timeSeries.add(time4, new DecimalType(time4.toEpochMilli())); + + assertThat(timeSeries.size(), is(4)); + + // assert begin end time + assertThat(timeSeries.getBegin(), is(time3)); + assertThat(timeSeries.getEnd(), is(time2)); + + // assert order of events and content + List entries = timeSeries.getStates().toList(); + for (int i = 0; i < entries.size(); i++) { + if (i > 0) { + // assert order + assertThat(entries.get(i).timestamp(), is(greaterThan(entries.get(i - 1).timestamp()))); + } + assertThat(entries.get(i).timestamp().toEpochMilli(), + is(entries.get(i).state().as(DecimalType.class).longValue())); + } + } +} diff --git a/itests/org.openhab.core.tests/src/main/java/org/openhab/core/internal/items/ItemUpdaterOSGiTest.java b/itests/org.openhab.core.tests/src/main/java/org/openhab/core/internal/items/ItemUpdaterOSGiTest.java index e6e06fb3006..e24ad0aca14 100644 --- a/itests/org.openhab.core.tests/src/main/java/org/openhab/core/internal/items/ItemUpdaterOSGiTest.java +++ b/itests/org.openhab.core.tests/src/main/java/org/openhab/core/internal/items/ItemUpdaterOSGiTest.java @@ -14,11 +14,13 @@ import static org.junit.jupiter.api.Assertions.*; +import java.time.Instant; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import org.eclipse.jdt.annotation.NonNullByDefault; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.openhab.core.events.Event; @@ -28,9 +30,13 @@ import org.openhab.core.items.ItemRegistry; import org.openhab.core.items.events.ItemEventFactory; import org.openhab.core.items.events.ItemStateChangedEvent; +import org.openhab.core.items.events.ItemStateUpdatedEvent; +import org.openhab.core.items.events.ItemTimeSeriesUpdatedEvent; import org.openhab.core.library.items.SwitchItem; import org.openhab.core.library.types.OnOffType; import org.openhab.core.test.java.JavaOSGiTest; +import org.openhab.core.types.TimeSeries; +import org.openhab.core.types.UnDefType; /** * The {@link ItemUpdaterOSGiTest} runs inside an OSGi container and tests the {@link ItemRegistry}. @@ -44,6 +50,8 @@ public class ItemUpdaterOSGiTest extends JavaOSGiTest { private @NonNullByDefault({}) EventPublisher eventPublisher; private @NonNullByDefault({}) ItemRegistry itemRegistry; + private @NonNullByDefault({}) SwitchItem switchItem; + private final Queue receivedEvents = new ConcurrentLinkedQueue<>(); @BeforeEach @@ -55,7 +63,8 @@ public void setUp() { itemRegistry = getService(ItemRegistry.class); assertNotNull(itemRegistry); - itemRegistry.add(new SwitchItem("switch")); + switchItem = new SwitchItem("switch"); + itemRegistry.add(switchItem); EventSubscriber eventSubscriber = new EventSubscriber() { @Override @@ -65,12 +74,18 @@ public void receive(Event event) { @Override public Set getSubscribedEventTypes() { - return Set.of(ItemStateChangedEvent.TYPE); + return Set.of(ItemStateChangedEvent.TYPE, ItemStateUpdatedEvent.TYPE, ItemTimeSeriesUpdatedEvent.TYPE); } }; registerService(eventSubscriber); } + @AfterEach + public void tearDown() { + receivedEvents.clear(); + itemRegistry.remove(switchItem.getName()); + } + @Test public void testItemUpdaterSetsItemState() { eventPublisher.post(ItemEventFactory.createStateEvent("switch", OnOffType.ON)); @@ -79,32 +94,91 @@ public void testItemUpdaterSetsItemState() { } @Test - public void testItemUpdaterSendsStateChangedEvent() throws Exception { + public void testItemUpdaterSendsStateUpdatedEvent() throws Exception { eventPublisher.post(ItemEventFactory.createStateEvent("switch", OnOffType.ON)); Item switchItem = itemRegistry.get("switch"); waitForAssert(() -> assertEquals(OnOffType.ON, switchItem.getState())); + // wait for the initial events (updated and changed, because it was NULL before) + waitForAssert(() -> { + assertEquals(2, receivedEvents.size()); + ItemStateUpdatedEvent updatedEvent = (ItemStateUpdatedEvent) receivedEvents.poll(); + assertNotNull(updatedEvent); + assertEquals(OnOffType.ON, updatedEvent.getItemState()); + ItemStateChangedEvent changedEvent = (ItemStateChangedEvent) receivedEvents.poll(); + assertNotNull(changedEvent); + assertEquals(UnDefType.NULL, changedEvent.getOldItemState()); + assertEquals(OnOffType.ON, changedEvent.getItemState()); + }); + + // update with same value + eventPublisher.post(ItemEventFactory.createStateEvent("switch", OnOffType.ON)); + + // wait for the updated event + waitForAssert(() -> { + assertEquals(1, receivedEvents.size()); + ItemStateUpdatedEvent updatedEvent = (ItemStateUpdatedEvent) receivedEvents.poll(); + assertNotNull(updatedEvent); + assertEquals(OnOffType.ON, updatedEvent.getItemState()); + }); + + // ensure no other events send + Thread.sleep(1000); + assertTrue(receivedEvents.isEmpty()); + } + + @Test + public void testItemUpdaterSendsStateChangedEvent() throws Exception { + eventPublisher.post(ItemEventFactory.createStateEvent("switch", OnOffType.ON)); + + // wait for the initial events (updated and changed, because it was NULL before) + waitForAssert(() -> { + assertEquals(2, receivedEvents.size()); + ItemStateUpdatedEvent updatedEvent = (ItemStateUpdatedEvent) receivedEvents.poll(); + assertNotNull(updatedEvent); + assertEquals(OnOffType.ON, updatedEvent.getItemState()); + ItemStateChangedEvent changedEvent = (ItemStateChangedEvent) receivedEvents.poll(); + assertNotNull(changedEvent); + assertEquals(UnDefType.NULL, changedEvent.getOldItemState()); + assertEquals(OnOffType.ON, changedEvent.getItemState()); + }); + // change state eventPublisher.post(ItemEventFactory.createStateEvent("switch", OnOffType.OFF)); - // wait for an event that change the state from OFF to ON - // there could be one remaining event from the 'ItemUpdater sets item state' test + // wait for two events: the updated event and the changed event waitForAssert(() -> { - assertFalse(receivedEvents.isEmpty()); + assertEquals(2, receivedEvents.size()); + ItemStateUpdatedEvent updatedEvent = (ItemStateUpdatedEvent) receivedEvents.poll(); + assertNotNull(updatedEvent); + assertEquals(OnOffType.OFF, updatedEvent.getItemState()); ItemStateChangedEvent changedEvent = (ItemStateChangedEvent) receivedEvents.poll(); assertNotNull(changedEvent); assertEquals(OnOffType.ON, changedEvent.getOldItemState()); assertEquals(OnOffType.OFF, changedEvent.getItemState()); }); - // send update for same state - eventPublisher.post(ItemEventFactory.createStateEvent("switch", OnOffType.OFF)); + // wait a second and make sure no other events have been sent + Thread.sleep(1000); + assertTrue(receivedEvents.isEmpty()); + } - // wait a few milliseconds - Thread.sleep(100); + @Test + public void testItemUpdaterSetsTimeSeries() throws InterruptedException { + TimeSeries timeSeries = new TimeSeries(TimeSeries.Policy.ADD); + timeSeries.add(Instant.now(), OnOffType.ON); + eventPublisher.post(ItemEventFactory.createTimeSeriesEvent("switch", timeSeries, null)); + + // wait for the event + waitForAssert(() -> { + assertEquals(1, receivedEvents.size()); + ItemTimeSeriesUpdatedEvent updatedEvent = (ItemTimeSeriesUpdatedEvent) receivedEvents.poll(); + assertNotNull(updatedEvent); + assertEquals(timeSeries, updatedEvent.getTimeSeries()); + }); - // make sure no state changed event has been sent + Thread.sleep(1000); assertTrue(receivedEvents.isEmpty()); } } diff --git a/itests/org.openhab.core.thing.tests/src/main/java/org/openhab/core/thing/internal/CommunicationManagerOSGiTest.java b/itests/org.openhab.core.thing.tests/src/main/java/org/openhab/core/thing/internal/CommunicationManagerOSGiTest.java index b3893bce00c..8eae74a400e 100644 --- a/itests/org.openhab.core.thing.tests/src/main/java/org/openhab/core/thing/internal/CommunicationManagerOSGiTest.java +++ b/itests/org.openhab.core.thing.tests/src/main/java/org/openhab/core/thing/internal/CommunicationManagerOSGiTest.java @@ -72,12 +72,13 @@ import org.openhab.core.thing.profiles.ProfileFactory; import org.openhab.core.thing.profiles.ProfileTypeProvider; import org.openhab.core.thing.profiles.ProfileTypeUID; -import org.openhab.core.thing.profiles.StateProfile; +import org.openhab.core.thing.profiles.TimeSeriesProfile; import org.openhab.core.thing.profiles.TriggerProfile; import org.openhab.core.thing.type.ChannelKind; import org.openhab.core.thing.type.ChannelType; import org.openhab.core.thing.type.ChannelTypeUID; import org.openhab.core.types.Command; +import org.openhab.core.types.TimeSeries; /** * @@ -148,7 +149,7 @@ protected void addProvider(Provider provider) { private @Mock @NonNullByDefault({}) ItemStateConverter itemStateConverterMock; private @Mock @NonNullByDefault({}) ProfileAdvisor profileAdvisorMock; private @Mock @NonNullByDefault({}) ProfileFactory profileFactoryMock; - private @Mock @NonNullByDefault({}) StateProfile stateProfileMock; + private @Mock @NonNullByDefault({}) TimeSeriesProfile stateProfileMock; private @Mock @NonNullByDefault({}) ThingHandler thingHandlerMock; private @Mock @NonNullByDefault({}) ThingRegistry thingRegistryMock; private @Mock @NonNullByDefault({}) TriggerProfile triggerProfileMock; @@ -272,6 +273,32 @@ public void testPostCommandMultiLink() { verifyNoMoreInteractions(triggerProfileMock); } + @Test + public void testTimeSeriesSingleLink() { + TimeSeries timeSeries = new TimeSeries(TimeSeries.Policy.REPLACE); + + manager.sendTimeSeries(STATE_CHANNEL_UID_1, timeSeries); + + waitForAssert(() -> { + verify(stateProfileMock).onTimeSeriesFromHandler(eq(timeSeries)); + }); + verifyNoMoreInteractions(stateProfileMock); + verifyNoMoreInteractions(triggerProfileMock); + } + + @Test + public void testTimeSeriesMultiLink() { + TimeSeries timeSeries = new TimeSeries(TimeSeries.Policy.REPLACE); + + manager.sendTimeSeries(STATE_CHANNEL_UID_2, timeSeries); + + waitForAssert(() -> { + verify(stateProfileMock, times(2)).onTimeSeriesFromHandler(eq(timeSeries)); + }); + verifyNoMoreInteractions(stateProfileMock); + verifyNoMoreInteractions(triggerProfileMock); + } + @Test public void testItemCommandEventSingleLink() { manager.receive(ItemEventFactory.createCommandEvent(ITEM_NAME_2, OnOffType.ON));