Skip to content

Commit

Permalink
further work
Browse files Browse the repository at this point in the history
Signed-off-by: Jan N. Klug <[email protected]>
  • Loading branch information
J-N-K committed May 18, 2023
1 parent b8a9f9b commit bb63574
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ public void timeSeriesUpdated(Item item, TimeSeries timeSeries) {
FilterCriteria removeFilter = new FilterCriteria().setItemName(item.getName())
.setBeginDate(begin).setEndDate(end);
service.remove(removeFilter);
container.removeScheduledTimeSeries(item.getName(), timeSeries);
}
// update states
timeSeries.getStates().forEach(
Expand Down Expand Up @@ -489,6 +490,18 @@ public void scheduleNewTimeSeries(String itemName, TimeSeries timeSeries) {
logger.debug("TimeSeries: Total scheduled forecasted values for {} is {}", itemName, jobs.size());
}

public void removeScheduledTimeSeries(String itemName, TimeSeries timeSeries) {
List<ScheduledCompletableFuture<?>> jobs = forecastJobs.computeIfAbsent(itemName,
i -> new CopyOnWriteArrayList<>());
ZonedDateTime begin = timeSeries.getBegin().atZone(ZoneId.systemDefault()).minusSeconds(1);
ZonedDateTime end = timeSeries.getEnd().atZone(ZoneId.systemDefault()).plusSeconds(1);
jobs.forEach(job -> {
if (job.getScheduledTime().isAfter(begin) && job.getScheduledTime().isBefore(end)) {
job.cancel(true);
}
});
}

public void addItem(Item item) {
if (persistenceService instanceof QueryablePersistenceService) {
if (UnDefType.NULL.equals(item.getState())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand All @@ -27,6 +30,7 @@
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.List;

import org.eclipse.jdt.annotation.NonNullByDefault;
Expand Down Expand Up @@ -317,57 +321,70 @@ public void noRestoreOnStartupWhenItemNotNull() {

@Test
public void storeTimeSeriesAndForecastsScheduled() {
List<ScheduledCompletableFuture<?>> futures = new ArrayList<>();

when(schedulerMock.at(any(SchedulerRunnable.class), any(Instant.class))).thenAnswer(i -> {
ScheduledCompletableFuture<?> future = mock(ScheduledCompletableFuture.class);
when(future.getScheduledTime()).thenReturn(((Instant) i.getArgument(1)).atZone(ZoneId.systemDefault()));
futures.add(future);
return future;
});

addConfiguration(TEST_MODIFIABLE_PERSISTENCE_SERVICE_ID, new PersistenceAllConfig(),
PersistenceStrategy.Globals.FORECAST, null);

Instant time1 = Instant.now().minusSeconds(1000);
Instant time2 = Instant.now().plusSeconds(1000);
Instant time3 = Instant.now().plusSeconds(2000);
Instant time4 = Instant.now().plusSeconds(3000);

// add elements

TimeSeries timeSeries = new TimeSeries(TimeSeries.Policy.ADD);
timeSeries.add(time1, new StringType("one"));
timeSeries.add(time2, new StringType("two"));
timeSeries.add(time2, new StringType("three"));
timeSeries.add(time3, new StringType("three"));
timeSeries.add(time4, new StringType("four"));

manager.timeSeriesUpdated(TEST_ITEM, timeSeries);

verify(modifiablePersistenceServiceMock, times(3)).store(any(Item.class), any(ZonedDateTime.class),
InOrder inOrder = inOrder(modifiablePersistenceServiceMock, schedulerMock);

inOrder.verify(modifiablePersistenceServiceMock, times(4)).store(any(Item.class), any(ZonedDateTime.class),
any(State.class));
// first element not scheduled, because it is in the past
verify(schedulerMock).at(any(SchedulerRunnable.class), ArgumentMatchers.eq(time2));
verify(schedulerMock).at(any(SchedulerRunnable.class), ArgumentMatchers.eq(time3));

verifyNoMoreInteractions(safeCallerBuilderMock, modifiablePersistenceServiceMock);
}
inOrder.verify(schedulerMock).at(any(SchedulerRunnable.class), ArgumentMatchers.eq(time2));
inOrder.verify(schedulerMock).at(any(SchedulerRunnable.class), ArgumentMatchers.eq(time3));
inOrder.verify(schedulerMock).at(any(SchedulerRunnable.class), ArgumentMatchers.eq(time4));

@Test
public void storeTimeSeriesAndReplaceForecasts() {
addConfiguration(TEST_MODIFIABLE_PERSISTENCE_SERVICE_ID, new PersistenceAllConfig(),
PersistenceStrategy.Globals.FORECAST, null);

Instant time1 = Instant.now().minusSeconds(1000);
Instant time2 = Instant.now().plusSeconds(1000);
Instant time3 = Instant.now().plusSeconds(2000);
inOrder.verifyNoMoreInteractions();

TimeSeries timeSeries = new TimeSeries(TimeSeries.Policy.REPLACE);
timeSeries.add(time1, new StringType("one"));
timeSeries.add(time2, new StringType("two"));
timeSeries.add(time2, new StringType("three"));
// replace elements
TimeSeries timeSeries2 = new TimeSeries(TimeSeries.Policy.REPLACE);
timeSeries2.add(time3, new StringType("three2"));
timeSeries2.add(time4, new StringType("four2"));

manager.timeSeriesUpdated(TEST_ITEM, timeSeries);
manager.timeSeriesUpdated(TEST_ITEM, timeSeries2);

// verify removal of old elements from service
ArgumentCaptor<FilterCriteria> filterCaptor = ArgumentCaptor.forClass(FilterCriteria.class);

InOrder inOrder = inOrder(modifiablePersistenceServiceMock);

inOrder.verify(modifiablePersistenceServiceMock).remove(filterCaptor.capture());
inOrder.verify(modifiablePersistenceServiceMock, times(3)).store(any(Item.class), any(ZonedDateTime.class),
any(State.class));

FilterCriteria filterCriteria = filterCaptor.getValue();
assertThat(filterCriteria.getItemName(), is(TEST_ITEM_NAME));
assertThat(filterCriteria.getBeginDate(), is(time1.atZone(ZoneId.systemDefault())));
assertThat(filterCriteria.getBeginDate(), is(time1.atZone(ZoneId.systemDefault())));
assertThat(filterCriteria.getBeginDate(), is(time3.atZone(ZoneId.systemDefault())));
assertThat(filterCriteria.getEndDate(), is(time4.atZone(ZoneId.systemDefault())));

// verify second and third restore-future are cancelled
verify(futures.get(0), never()).cancel(anyBoolean());
verify(futures.get(1)).cancel(true);
verify(futures.get(2)).cancel(true);

// verify new values are stored
inOrder.verify(modifiablePersistenceServiceMock, times(2)).store(any(Item.class), any(ZonedDateTime.class),
any(State.class));
// verify new restore futures are scheduled
inOrder.verify(schedulerMock).at(any(SchedulerRunnable.class), ArgumentMatchers.eq(time3));
inOrder.verify(schedulerMock).at(any(SchedulerRunnable.class), ArgumentMatchers.eq(time4));

inOrder.verifyNoMoreInteractions();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@

import java.time.Instant;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Stream;

import org.eclipse.jdt.annotation.NonNullByDefault;
Expand All @@ -27,41 +30,70 @@
*/
@NonNullByDefault
public class TimeSeries {
private final List<Entry> states = new ArrayList<>();
private final TreeSet<Entry> states = new TreeSet<>(Comparator.comparing(e -> e.timestamp));
private final Policy policy;
private Instant begin = Instant.MAX;
private Instant end = Instant.MIN;

public TimeSeries(Policy policy) {
this.policy = policy;
}

/**
* Get the persistence policy of this series.
* <p/>
* {@link Policy#ADD} add the content to the persistence, {@link Policy#REPLACE} first removes all persisted elements in the timespan given by {@link #getBegin()} and {@link #getEnd()}.
*
* @return
*/
public Policy getPolicy() {
return policy;
}

/**
* Get the timestamp of the first element in this series.
*
* @return the {@link Instant} of the first element
*/
public Instant getBegin() {
return begin;
return states.isEmpty() ? Instant.MAX : states.first().timestamp();
}

/**
* Get the timestamp of the last element in this series.
*
* @return the {@link Instant} of the last element
*/
public Instant getEnd() {
return end;
return states.isEmpty() ? Instant.MIN : states.last().timestamp();
}

/**
* Get the number of elements in this series.
*
* @return the number of elements
*/
public int size() {
return states.size();
}

/**
* Add a new element to this series.
* <p/>
* Elements can be added in an arbitrary order and are sorted chronologically.
*
* @param timestamp an {@link Instant} for the given state
* @param state the {@link State} at the given timestamp
*/
public void add(Instant timestamp, State state) {
states.add(new Entry(timestamp, state));
if (begin == null || timestamp.isBefore(begin)) {
begin = timestamp;
}
if (end == null || timestamp.isAfter(end)) {
end = timestamp;
}
}

/**
* Get the content of this series.
* <p/>
* The entries are returned in chronological order, earlier entries before later entries.
*
* @return a {@link <Stream<Entry>} with the content of this series.
*/
public Stream<Entry> getStates() {
return List.copyOf(states).stream();
}
Expand Down

0 comments on commit bb63574

Please sign in to comment.